## Join Dask.DataFrame against small Pandas DataFrame

### Advanced techniques: active memory management

### Create fake dataset of products and categories

In [1]:
from itertools import product

alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
products = [''.join(x) for x in product(alphabet, alphabet, alphabet, alphabet)]
len(products)

456976

In [2]:
products[:8]

['AAAA', 'AAAB', 'AAAC', 'AAAD', 'AAAE', 'AAAF', 'AAAG', 'AAAH']

In [3]:
products[-8:]

['ZZZS', 'ZZZT', 'ZZZU', 'ZZZV', 'ZZZW', 'ZZZX', 'ZZZY', 'ZZZZ']

In [4]:
import pandas as pd
import numpy as np

dimension = pd.DataFrame({'product': products, 
                          'category': np.random.randint(0, 1000, size=len(products))})
dimension.head()

Unnamed: 0,category,product
0,443,AAAA
1,594,AAAB
2,201,AAAC
3,809,AAAD
4,974,AAAE


In [5]:
def fake_data(size):
    import random
    import numpy as np
    return pd.DataFrame({'product': [''.join(random.sample(alphabet, 4)) for i in range(size)],
                         'value': np.random.exponential(100, size=size).astype(int),
                         'rating': np.random.normal(0, 1, size=size)})

In [6]:
example = fake_data(5)
example

Unnamed: 0,product,rating,value
0,BHYI,-2.189738,13
1,XFAS,-0.478826,72
2,ILBA,1.176059,35
3,VXYS,0.872137,51
4,GVEH,0.897264,22


In [7]:
example.merge(dimension, on='product', how='inner')

Unnamed: 0,product,rating,value,category
0,BHYI,-2.189738,13,684
1,XFAS,-0.478826,72,832
2,ILBA,1.176059,35,539
3,VXYS,0.872137,51,375
4,GVEH,0.897264,22,749


## Parallelize with Dask.Dataframe

We do the same thing but now in parallel with dask.dataframe.

We start on a single machine.  We'll repeat on a cluster later.

### Make a fake fact table

Normally we we would load our data in from some external source as in the following:

```python
df = dask.dataframe.read_csv('hdfs://path/to/my/data/*.csv')
```

Instead we create a `fake_data` function to produce each "file".  You can ignore this if you have data elsewhere.

In [8]:
import dask.dataframe as dd
from dask import delayed

partitions = []
for i in range(10):
    partitions.append(delayed(fake_data)(10000))
    
example = fake_data(1)

df = dd.from_delayed(partitions, example)

In [9]:
df

dd.DataFrame<from-de..., npartitions=10>

In [10]:
df.head()

Unnamed: 0,product,rating,value
0,JHWM,-0.342292,198
1,DHCV,-1.399805,59
2,QHYL,0.176286,112
3,KAHU,0.476704,35
4,ULGB,0.635553,13


In [11]:
joined = df.merge(dimension, how='inner', on='product')
joined.head()

Unnamed: 0,product,rating,value,category
0,UFBO,-1.114049,28,168
1,OANM,-1.24042,75,521
2,KAZF,1.289296,103,744
3,BWYU,-0.755966,152,421
4,VHFT,0.277356,48,400


### Analyze the joined table

We'll find the top rated categories

In [12]:
joined.groupby('category').rating.mean().nlargest(10).compute()

category
98     0.295137
706    0.273313
111    0.265551
798    0.263368
110    0.255219
550    0.250642
893    0.249949
647    0.244770
66     0.243825
816    0.237187
Name: rating, dtype: float64

## Distributed computing

We connect to a cluster of workers, and repeat the experiment on a larger cluster.

In [13]:
from dask.distributed import Executor, progress
e = Executor('localhost:8786')
e

<Executor: scheduler=localhost:8786 workers=320 threads=320>

In [14]:
dfs = [delayed(fake_data)(10000) for i in range(1000)]
example = fake_data(1)
df = dd.from_delayed(dfs, example)

In [15]:
df

dd.DataFrame<from-de..., npartitions=1000>

### Scatter the dimension table

In [16]:
dimension.head()

Unnamed: 0,category,product
0,443,AAAA
1,594,AAAB
2,201,AAAC
3,809,AAAD
4,974,AAAE


In [17]:
dimension2 = dd.from_pandas(dimension, npartitions=1)
dimension2 = e.persist(dimension2)
e.replicate(dimension2)

### Persist Data on Cluster

In [18]:
df = e.persist(df)

### Compute

In [19]:
joined = df.merge(dimension2, how='inner', on='product')
result = joined.groupby('category').rating.mean().nlargest(10)
future = e.compute(result)
progress(future)

In [None]:
dimension.head()