{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask DataFrame and cuDF on NYC Taxi CSV data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Dask Cluster on an Eight-GPU DGX Machine" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 8
  • \n", "
  • Cores: 8
  • \n", "
  • Memory: 540.96 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask_cuda import LocalCUDACluster\n", "cluster = LocalCUDACluster()\n", "\n", "from dask.distributed import Client\n", "client = Client(cluster)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Previously we ran this to shard the files more finely for cudf.read_csv\n", "\n", "```python\n", "import dask.dataframe as dd\n", "pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',\n", " parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n", "\n", "pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read CSV files into Dask-GPU-DataFrame" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
VendorIDtpep_pickup_datetimetpep_dropoff_datetimepassenger_counttrip_distanceRatecodeIDstore_and_fwd_flagPULocationIDDOLocationIDpayment_typefare_amountextramta_taxtip_amounttolls_amountimprovement_surchargetotal_amount
011969-12-31 23:59:591969-12-31 23:59:5913.301-2049400382263161112.50.00.52.000.00.315.30
111969-12-31 23:59:591969-12-31 23:59:5910.901-204940038218623415.00.00.51.450.00.37.25
211969-12-31 23:59:591969-12-31 23:59:5911.101-204940038216416115.50.00.51.000.00.37.30
311969-12-31 23:59:591969-12-31 23:59:5911.101-20494003822367516.00.00.51.700.00.38.50
421969-12-31 23:59:591969-12-31 23:59:5910.022-2049400382249234252.00.00.50.000.00.352.80
\n", "
" ], "text/plain": [ " VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n", "0 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", "1 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", "2 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", "3 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", "4 2 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n", "\n", " trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n", "0 3.30 1 -2049400382 263 161 \n", "1 0.90 1 -2049400382 186 234 \n", "2 1.10 1 -2049400382 164 161 \n", "3 1.10 1 -2049400382 236 75 \n", "4 0.02 2 -2049400382 249 234 \n", "\n", " payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n", "0 1 12.5 0.0 0.5 2.00 0.0 \n", "1 1 5.0 0.0 0.5 1.45 0.0 \n", "2 1 5.5 0.0 0.5 1.00 0.0 \n", "3 1 6.0 0.0 0.5 1.70 0.0 \n", "4 2 52.0 0.0 0.5 0.00 0.0 \n", "\n", " improvement_surcharge total_amount \n", "0 0.3 15.30 \n", "1 0.3 7.25 \n", "2 0.3 7.30 \n", "3 0.3 8.50 \n", "4 0.3 52.80 " ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask_cudf\n", "\n", "gdf = dask_cudf.read_csv('data/nyc/many/*.csv')\n", "gdf.head().to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Time a full-pass computation\n", "\n", "Most of the time here is spent reading data from disk and parsing it." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s\n", "Wall time: 4.68 s\n" ] }, { "data": { "text/plain": [ "184464740" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time gdf.passenger_count.sum().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Single GPU" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s\n", "Wall time: 10.9 s\n" ] }, { "data": { "text/plain": [ "184464740" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time gdf.passenger_count.sum().compute(scheduler='single-threaded')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Single CPU" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pandas.core.frame.DataFrame" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.dataframe as dd\n", "\n", "df = dd.read_csv('data/nyc/many/*.csv')\n", "type(df.head())" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s\n", "Wall time: 3min 14s\n" ] }, { "data": { "text/plain": [ "184464740" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time df.passenger_count.sum().compute(scheduler='single-threaded')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Eight CPUs, one per process" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s\n", "Wall time: 57.5 s\n" ] }, { "data": { "text/plain": [ "184464740" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time df.passenger_count.sum().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Eighty CPUs with a balance of threads and processes" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "distributed.comm.tcp - WARNING - Closing dangling stream in \n" ] } ], "source": [ "client.close()\n", "cluster.close()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 10
  • \n", "
  • Cores: 80
  • \n", "
  • Memory: 540.96 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client = Client(n_workers=10, threads_per_worker=8)\n", "client" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s\n", "Wall time: 34.9 s\n" ] }, { "data": { "text/plain": [ "184464740" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time df.passenger_count.sum().compute()" ] } ], "metadata": { "kernelspec": { "display_name": "Python [conda env:cudf_dev]", "language": "python", "name": "conda-env-cudf_dev-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.5" } }, "nbformat": 4, "nbformat_minor": 2 }