{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Join Dask.DataFrame against small Pandas DataFrame\n",
"\n",
"### Advanced techniques: active memory management"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create fake dataset of products and categories"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"456976"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from itertools import product\n",
"\n",
"alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'\n",
"products = [''.join(x) for x in product(alphabet, alphabet, alphabet, alphabet)]\n",
"len(products)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"['AAAA', 'AAAB', 'AAAC', 'AAAD', 'AAAE', 'AAAF', 'AAAG', 'AAAH']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"products[:8]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"['ZZZS', 'ZZZT', 'ZZZU', 'ZZZV', 'ZZZW', 'ZZZX', 'ZZZY', 'ZZZZ']"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"products[-8:]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"
\n",
" \n",
" \n",
" | \n",
" category | \n",
" product | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 443 | \n",
" AAAA | \n",
"
\n",
" \n",
" | 1 | \n",
" 594 | \n",
" AAAB | \n",
"
\n",
" \n",
" | 2 | \n",
" 201 | \n",
" AAAC | \n",
"
\n",
" \n",
" | 3 | \n",
" 809 | \n",
" AAAD | \n",
"
\n",
" \n",
" | 4 | \n",
" 974 | \n",
" AAAE | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" category product\n",
"0 443 AAAA\n",
"1 594 AAAB\n",
"2 201 AAAC\n",
"3 809 AAAD\n",
"4 974 AAAE"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"dimension = pd.DataFrame({'product': products, \n",
" 'category': np.random.randint(0, 1000, size=len(products))})\n",
"dimension.head()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def fake_data(size):\n",
" import random\n",
" import numpy as np\n",
" return pd.DataFrame({'product': [''.join(random.sample(alphabet, 4)) for i in range(size)],\n",
" 'value': np.random.exponential(100, size=size).astype(int),\n",
" 'rating': np.random.normal(0, 1, size=size)})"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" product | \n",
" rating | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" BHYI | \n",
" -2.189738 | \n",
" 13 | \n",
"
\n",
" \n",
" | 1 | \n",
" XFAS | \n",
" -0.478826 | \n",
" 72 | \n",
"
\n",
" \n",
" | 2 | \n",
" ILBA | \n",
" 1.176059 | \n",
" 35 | \n",
"
\n",
" \n",
" | 3 | \n",
" VXYS | \n",
" 0.872137 | \n",
" 51 | \n",
"
\n",
" \n",
" | 4 | \n",
" GVEH | \n",
" 0.897264 | \n",
" 22 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" product rating value\n",
"0 BHYI -2.189738 13\n",
"1 XFAS -0.478826 72\n",
"2 ILBA 1.176059 35\n",
"3 VXYS 0.872137 51\n",
"4 GVEH 0.897264 22"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"example = fake_data(5)\n",
"example"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" product | \n",
" rating | \n",
" value | \n",
" category | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" BHYI | \n",
" -2.189738 | \n",
" 13 | \n",
" 684 | \n",
"
\n",
" \n",
" | 1 | \n",
" XFAS | \n",
" -0.478826 | \n",
" 72 | \n",
" 832 | \n",
"
\n",
" \n",
" | 2 | \n",
" ILBA | \n",
" 1.176059 | \n",
" 35 | \n",
" 539 | \n",
"
\n",
" \n",
" | 3 | \n",
" VXYS | \n",
" 0.872137 | \n",
" 51 | \n",
" 375 | \n",
"
\n",
" \n",
" | 4 | \n",
" GVEH | \n",
" 0.897264 | \n",
" 22 | \n",
" 749 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" product rating value category\n",
"0 BHYI -2.189738 13 684\n",
"1 XFAS -0.478826 72 832\n",
"2 ILBA 1.176059 35 539\n",
"3 VXYS 0.872137 51 375\n",
"4 GVEH 0.897264 22 749"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"example.merge(dimension, on='product', how='inner')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallelize with Dask.Dataframe\n",
"\n",
"We do the same thing but now in parallel with dask.dataframe.\n",
"\n",
"We start on a single machine. We'll repeat on a cluster later."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make a fake fact table\n",
"\n",
"Normally we we would load our data in from some external source as in the following:\n",
"\n",
"```python\n",
"df = dask.dataframe.read_csv('hdfs://path/to/my/data/*.csv')\n",
"```\n",
"\n",
"Instead we create a `fake_data` function to produce each \"file\". You can ignore this if you have data elsewhere."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"from dask import delayed\n",
"\n",
"partitions = []\n",
"for i in range(10):\n",
" partitions.append(delayed(fake_data)(10000))\n",
" \n",
"example = fake_data(1)\n",
"\n",
"df = dd.from_delayed(partitions, example)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dd.DataFrame"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" product | \n",
" rating | \n",
" value | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" JHWM | \n",
" -0.342292 | \n",
" 198 | \n",
"
\n",
" \n",
" | 1 | \n",
" DHCV | \n",
" -1.399805 | \n",
" 59 | \n",
"
\n",
" \n",
" | 2 | \n",
" QHYL | \n",
" 0.176286 | \n",
" 112 | \n",
"
\n",
" \n",
" | 3 | \n",
" KAHU | \n",
" 0.476704 | \n",
" 35 | \n",
"
\n",
" \n",
" | 4 | \n",
" ULGB | \n",
" 0.635553 | \n",
" 13 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" product rating value\n",
"0 JHWM -0.342292 198\n",
"1 DHCV -1.399805 59\n",
"2 QHYL 0.176286 112\n",
"3 KAHU 0.476704 35\n",
"4 ULGB 0.635553 13"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" product | \n",
" rating | \n",
" value | \n",
" category | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" UFBO | \n",
" -1.114049 | \n",
" 28 | \n",
" 168 | \n",
"
\n",
" \n",
" | 1 | \n",
" OANM | \n",
" -1.240420 | \n",
" 75 | \n",
" 521 | \n",
"
\n",
" \n",
" | 2 | \n",
" KAZF | \n",
" 1.289296 | \n",
" 103 | \n",
" 744 | \n",
"
\n",
" \n",
" | 3 | \n",
" BWYU | \n",
" -0.755966 | \n",
" 152 | \n",
" 421 | \n",
"
\n",
" \n",
" | 4 | \n",
" VHFT | \n",
" 0.277356 | \n",
" 48 | \n",
" 400 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" product rating value category\n",
"0 UFBO -1.114049 28 168\n",
"1 OANM -1.240420 75 521\n",
"2 KAZF 1.289296 103 744\n",
"3 BWYU -0.755966 152 421\n",
"4 VHFT 0.277356 48 400"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined = df.merge(dimension, how='inner', on='product')\n",
"joined.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Analyze the joined table\n",
"\n",
"We'll find the top rated categories"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"category\n",
"98 0.295137\n",
"706 0.273313\n",
"111 0.265551\n",
"798 0.263368\n",
"110 0.255219\n",
"550 0.250642\n",
"893 0.249949\n",
"647 0.244770\n",
"66 0.243825\n",
"816 0.237187\n",
"Name: rating, dtype: float64"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined.groupby('category').rating.mean().nlargest(10).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed computing\n",
"\n",
"We connect to a cluster of workers, and repeat the experiment on a larger cluster."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Executor, progress\n",
"e = Executor('localhost:8786')\n",
"e"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"dfs = [delayed(fake_data)(10000) for i in range(1000)]\n",
"example = fake_data(1)\n",
"df = dd.from_delayed(dfs, example)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dd.DataFrame"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scatter the dimension table"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" category | \n",
" product | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 443 | \n",
" AAAA | \n",
"
\n",
" \n",
" | 1 | \n",
" 594 | \n",
" AAAB | \n",
"
\n",
" \n",
" | 2 | \n",
" 201 | \n",
" AAAC | \n",
"
\n",
" \n",
" | 3 | \n",
" 809 | \n",
" AAAD | \n",
"
\n",
" \n",
" | 4 | \n",
" 974 | \n",
" AAAE | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" category product\n",
"0 443 AAAA\n",
"1 594 AAAB\n",
"2 201 AAAC\n",
"3 809 AAAD\n",
"4 974 AAAE"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dimension.head()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dimension2 = dd.from_pandas(dimension, npartitions=1)\n",
"dimension2 = e.persist(dimension2)\n",
"e.replicate(dimension2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Persist Data on Cluster"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = e.persist(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compute"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"joined = df.merge(dimension2, how='inner', on='product')\n",
"result = joined.groupby('category').rating.mean().nlargest(10)\n",
"future = e.compute(result)\n",
"progress(future)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dimension.head()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}