{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask DataFrame and cuDF on NYC Taxi CSV data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start Dask Cluster on an Eight-GPU DGX Machine"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"\n",
"Client\n",
"\n",
" | \n",
"\n",
"Cluster\n",
"\n",
" - Workers: 8
\n",
" - Cores: 8
\n",
" - Memory: 540.96 GB
\n",
" \n",
" | \n",
"
\n",
"
"
],
"text/plain": [
""
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"cluster = LocalCUDACluster()\n",
"\n",
"from dask.distributed import Client\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Previously we ran this to shard the files more finely for cudf.read_csv\n",
"\n",
"```python\n",
"import dask.dataframe as dd\n",
"pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',\n",
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n",
"\n",
"pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read CSV files into Dask-GPU-DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" fare_amount | \n",
" extra | \n",
" mta_tax | \n",
" tip_amount | \n",
" tolls_amount | \n",
" improvement_surcharge | \n",
" total_amount | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 1 | \n",
" 1969-12-31 23:59:59 | \n",
" 1969-12-31 23:59:59 | \n",
" 1 | \n",
" 3.30 | \n",
" 1 | \n",
" -2049400382 | \n",
" 263 | \n",
" 161 | \n",
" 1 | \n",
" 12.5 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 2.00 | \n",
" 0.0 | \n",
" 0.3 | \n",
" 15.30 | \n",
"
\n",
" \n",
" | 1 | \n",
" 1 | \n",
" 1969-12-31 23:59:59 | \n",
" 1969-12-31 23:59:59 | \n",
" 1 | \n",
" 0.90 | \n",
" 1 | \n",
" -2049400382 | \n",
" 186 | \n",
" 234 | \n",
" 1 | \n",
" 5.0 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 1.45 | \n",
" 0.0 | \n",
" 0.3 | \n",
" 7.25 | \n",
"
\n",
" \n",
" | 2 | \n",
" 1 | \n",
" 1969-12-31 23:59:59 | \n",
" 1969-12-31 23:59:59 | \n",
" 1 | \n",
" 1.10 | \n",
" 1 | \n",
" -2049400382 | \n",
" 164 | \n",
" 161 | \n",
" 1 | \n",
" 5.5 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 1.00 | \n",
" 0.0 | \n",
" 0.3 | \n",
" 7.30 | \n",
"
\n",
" \n",
" | 3 | \n",
" 1 | \n",
" 1969-12-31 23:59:59 | \n",
" 1969-12-31 23:59:59 | \n",
" 1 | \n",
" 1.10 | \n",
" 1 | \n",
" -2049400382 | \n",
" 236 | \n",
" 75 | \n",
" 1 | \n",
" 6.0 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 1.70 | \n",
" 0.0 | \n",
" 0.3 | \n",
" 8.50 | \n",
"
\n",
" \n",
" | 4 | \n",
" 2 | \n",
" 1969-12-31 23:59:59 | \n",
" 1969-12-31 23:59:59 | \n",
" 1 | \n",
" 0.02 | \n",
" 2 | \n",
" -2049400382 | \n",
" 249 | \n",
" 234 | \n",
" 2 | \n",
" 52.0 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 0.00 | \n",
" 0.0 | \n",
" 0.3 | \n",
" 52.80 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"1 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"2 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"3 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"4 2 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 3.30 1 -2049400382 263 161 \n",
"1 0.90 1 -2049400382 186 234 \n",
"2 1.10 1 -2049400382 164 161 \n",
"3 1.10 1 -2049400382 236 75 \n",
"4 0.02 2 -2049400382 249 234 \n",
"\n",
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 1 12.5 0.0 0.5 2.00 0.0 \n",
"1 1 5.0 0.0 0.5 1.45 0.0 \n",
"2 1 5.5 0.0 0.5 1.00 0.0 \n",
"3 1 6.0 0.0 0.5 1.70 0.0 \n",
"4 2 52.0 0.0 0.5 0.00 0.0 \n",
"\n",
" improvement_surcharge total_amount \n",
"0 0.3 15.30 \n",
"1 0.3 7.25 \n",
"2 0.3 7.30 \n",
"3 0.3 8.50 \n",
"4 0.3 52.80 "
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask_cudf\n",
"\n",
"gdf = dask_cudf.read_csv('data/nyc/many/*.csv')\n",
"gdf.head().to_pandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Time a full-pass computation\n",
"\n",
"Most of the time here is spent reading data from disk and parsing it."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s\n",
"Wall time: 4.68 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time gdf.passenger_count.sum().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single GPU"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s\n",
"Wall time: 10.9 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time gdf.passenger_count.sum().compute(scheduler='single-threaded')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single CPU"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pandas.core.frame.DataFrame"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.dataframe as dd\n",
"\n",
"df = dd.read_csv('data/nyc/many/*.csv')\n",
"type(df.head())"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s\n",
"Wall time: 3min 14s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.passenger_count.sum().compute(scheduler='single-threaded')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Eight CPUs, one per process"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s\n",
"Wall time: 57.5 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.passenger_count.sum().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Eighty CPUs with a balance of threads and processes"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"distributed.comm.tcp - WARNING - Closing dangling stream in \n"
]
}
],
"source": [
"client.close()\n",
"cluster.close()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"Client\n",
"\n",
" | \n",
"\n",
"Cluster\n",
"\n",
" - Workers: 10
\n",
" - Cores: 80
\n",
" - Memory: 540.96 GB
\n",
" \n",
" | \n",
"
\n",
"
"
],
"text/plain": [
""
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client = Client(n_workers=10, threads_per_worker=8)\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s\n",
"Wall time: 34.9 s\n"
]
},
{
"data": {
"text/plain": [
"184464740"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%time df.passenger_count.sum().compute()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:cudf_dev]",
"language": "python",
"name": "conda-env-cudf_dev-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}