{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Join Dask.DataFrame against small Pandas DataFrame\n", "\n", "### Advanced techniques: active memory management" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create fake dataset of products and categories" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "456976" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from itertools import product\n", "\n", "alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'\n", "products = [''.join(x) for x in product(alphabet, alphabet, alphabet, alphabet)]\n", "len(products)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['AAAA', 'AAAB', 'AAAC', 'AAAD', 'AAAE', 'AAAF', 'AAAG', 'AAAH']" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "products[:8]" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['ZZZS', 'ZZZT', 'ZZZU', 'ZZZV', 'ZZZW', 'ZZZX', 'ZZZY', 'ZZZZ']" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "products[-8:]" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
categoryproduct
0443AAAA
1594AAAB
2201AAAC
3809AAAD
4974AAAE
\n", "
" ], "text/plain": [ " category product\n", "0 443 AAAA\n", "1 594 AAAB\n", "2 201 AAAC\n", "3 809 AAAD\n", "4 974 AAAE" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "import numpy as np\n", "\n", "dimension = pd.DataFrame({'product': products, \n", " 'category': np.random.randint(0, 1000, size=len(products))})\n", "dimension.head()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def fake_data(size):\n", " import random\n", " import numpy as np\n", " return pd.DataFrame({'product': [''.join(random.sample(alphabet, 4)) for i in range(size)],\n", " 'value': np.random.exponential(100, size=size).astype(int),\n", " 'rating': np.random.normal(0, 1, size=size)})" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
productratingvalue
0BHYI-2.18973813
1XFAS-0.47882672
2ILBA1.17605935
3VXYS0.87213751
4GVEH0.89726422
\n", "
" ], "text/plain": [ " product rating value\n", "0 BHYI -2.189738 13\n", "1 XFAS -0.478826 72\n", "2 ILBA 1.176059 35\n", "3 VXYS 0.872137 51\n", "4 GVEH 0.897264 22" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "example = fake_data(5)\n", "example" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
productratingvaluecategory
0BHYI-2.18973813684
1XFAS-0.47882672832
2ILBA1.17605935539
3VXYS0.87213751375
4GVEH0.89726422749
\n", "
" ], "text/plain": [ " product rating value category\n", "0 BHYI -2.189738 13 684\n", "1 XFAS -0.478826 72 832\n", "2 ILBA 1.176059 35 539\n", "3 VXYS 0.872137 51 375\n", "4 GVEH 0.897264 22 749" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "example.merge(dimension, on='product', how='inner')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelize with Dask.Dataframe\n", "\n", "We do the same thing but now in parallel with dask.dataframe.\n", "\n", "We start on a single machine. We'll repeat on a cluster later." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Make a fake fact table\n", "\n", "Normally we we would load our data in from some external source as in the following:\n", "\n", "```python\n", "df = dask.dataframe.read_csv('hdfs://path/to/my/data/*.csv')\n", "```\n", "\n", "Instead we create a `fake_data` function to produce each \"file\". You can ignore this if you have data elsewhere." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import dask.dataframe as dd\n", "from dask import delayed\n", "\n", "partitions = []\n", "for i in range(10):\n", " partitions.append(delayed(fake_data)(10000))\n", " \n", "example = fake_data(1)\n", "\n", "df = dd.from_delayed(partitions, example)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dd.DataFrame" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
productratingvalue
0JHWM-0.342292198
1DHCV-1.39980559
2QHYL0.176286112
3KAHU0.47670435
4ULGB0.63555313
\n", "
" ], "text/plain": [ " product rating value\n", "0 JHWM -0.342292 198\n", "1 DHCV -1.399805 59\n", "2 QHYL 0.176286 112\n", "3 KAHU 0.476704 35\n", "4 ULGB 0.635553 13" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
productratingvaluecategory
0UFBO-1.11404928168
1OANM-1.24042075521
2KAZF1.289296103744
3BWYU-0.755966152421
4VHFT0.27735648400
\n", "
" ], "text/plain": [ " product rating value category\n", "0 UFBO -1.114049 28 168\n", "1 OANM -1.240420 75 521\n", "2 KAZF 1.289296 103 744\n", "3 BWYU -0.755966 152 421\n", "4 VHFT 0.277356 48 400" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "joined = df.merge(dimension, how='inner', on='product')\n", "joined.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze the joined table\n", "\n", "We'll find the top rated categories" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "category\n", "98 0.295137\n", "706 0.273313\n", "111 0.265551\n", "798 0.263368\n", "110 0.255219\n", "550 0.250642\n", "893 0.249949\n", "647 0.244770\n", "66 0.243825\n", "816 0.237187\n", "Name: rating, dtype: float64" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "joined.groupby('category').rating.mean().nlargest(10).compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed computing\n", "\n", "We connect to a cluster of workers, and repeat the experiment on a larger cluster." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Executor, progress\n", "e = Executor('localhost:8786')\n", "e" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": true }, "outputs": [], "source": [ "dfs = [delayed(fake_data)(10000) for i in range(1000)]\n", "example = fake_data(1)\n", "df = dd.from_delayed(dfs, example)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dd.DataFrame" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Scatter the dimension table" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
categoryproduct
0443AAAA
1594AAAB
2201AAAC
3809AAAD
4974AAAE
\n", "
" ], "text/plain": [ " category product\n", "0 443 AAAA\n", "1 594 AAAB\n", "2 201 AAAC\n", "3 809 AAAD\n", "4 974 AAAE" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dimension.head()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [], "source": [ "dimension2 = dd.from_pandas(dimension, npartitions=1)\n", "dimension2 = e.persist(dimension2)\n", "e.replicate(dimension2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Persist Data on Cluster" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": true }, "outputs": [], "source": [ "df = e.persist(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Compute" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": true }, "outputs": [], "source": [ "joined = df.merge(dimension2, how='inner', on='product')\n", "result = joined.groupby('category').rating.mean().nlargest(10)\n", "future = e.compute(result)\n", "progress(future)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "dimension.head()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.1" } }, "nbformat": 4, "nbformat_minor": 0 }