Skip to content

Instantly share code, notes, and snippets.

@Wanderer2014
Forked from mrocklin/nyc-taxi.ipynb
Created February 7, 2021 11:52
Show Gist options
  • Save Wanderer2014/4c967575c82b7f13c73c0a4afb964ee1 to your computer and use it in GitHub Desktop.
Save Wanderer2014/4c967575c82b7f13c73c0a4afb964ee1 to your computer and use it in GitHub Desktop.

Revisions

  1. @mrocklin mrocklin revised this gist Jan 13, 2019. 1 changed file with 224 additions and 1 deletion.
    225 changes: 224 additions & 1 deletion nyc-taxi.ipynb
    Original file line number Diff line number Diff line change
    @@ -310,6 +310,229 @@
    "source": [
    "%time gdf.passenger_count.sum().compute()"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Single GPU"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 6,
    "metadata": {},
    "outputs": [
    {
    "name": "stdout",
    "output_type": "stream",
    "text": [
    "CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s\n",
    "Wall time: 10.9 s\n"
    ]
    },
    {
    "data": {
    "text/plain": [
    "184464740"
    ]
    },
    "execution_count": 6,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "%time gdf.passenger_count.sum().compute(scheduler='single-threaded')"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Single CPU"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 7,
    "metadata": {},
    "outputs": [
    {
    "data": {
    "text/plain": [
    "pandas.core.frame.DataFrame"
    ]
    },
    "execution_count": 7,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "import dask.dataframe as dd\n",
    "\n",
    "df = dd.read_csv('data/nyc/many/*.csv')\n",
    "type(df.head())"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 8,
    "metadata": {},
    "outputs": [
    {
    "name": "stdout",
    "output_type": "stream",
    "text": [
    "CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s\n",
    "Wall time: 3min 14s\n"
    ]
    },
    {
    "data": {
    "text/plain": [
    "184464740"
    ]
    },
    "execution_count": 8,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "%time df.passenger_count.sum().compute(scheduler='single-threaded')"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Eight CPUs, one per process"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 9,
    "metadata": {},
    "outputs": [
    {
    "name": "stdout",
    "output_type": "stream",
    "text": [
    "CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s\n",
    "Wall time: 57.5 s\n"
    ]
    },
    {
    "data": {
    "text/plain": [
    "184464740"
    ]
    },
    "execution_count": 9,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "%time df.passenger_count.sum().compute()"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Eighty CPUs with a balance of threads and processes"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 10,
    "metadata": {},
    "outputs": [
    {
    "name": "stderr",
    "output_type": "stream",
    "text": [
    "distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:49958 remote=tcp://127.0.0.1:33058>\n"
    ]
    }
    ],
    "source": [
    "client.close()\n",
    "cluster.close()"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 11,
    "metadata": {},
    "outputs": [
    {
    "data": {
    "text/html": [
    "<table style=\"border: 2px solid white;\">\n",
    "<tr>\n",
    "<td style=\"vertical-align: top; border: 0px solid white\">\n",
    "<h3>Client</h3>\n",
    "<ul>\n",
    " <li><b>Scheduler: </b>tcp://127.0.0.1:45873\n",
    " <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
    "</ul>\n",
    "</td>\n",
    "<td style=\"vertical-align: top; border: 0px solid white\">\n",
    "<h3>Cluster</h3>\n",
    "<ul>\n",
    " <li><b>Workers: </b>10</li>\n",
    " <li><b>Cores: </b>80</li>\n",
    " <li><b>Memory: </b>540.96 GB</li>\n",
    "</ul>\n",
    "</td>\n",
    "</tr>\n",
    "</table>"
    ],
    "text/plain": [
    "<Client: scheduler='tcp://127.0.0.1:45873' processes=10 cores=80>"
    ]
    },
    "execution_count": 11,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "client = Client(n_workers=10, threads_per_worker=8)\n",
    "client"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 12,
    "metadata": {},
    "outputs": [
    {
    "name": "stdout",
    "output_type": "stream",
    "text": [
    "CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s\n",
    "Wall time: 34.9 s\n"
    ]
    },
    {
    "data": {
    "text/plain": [
    "184464740"
    ]
    },
    "execution_count": 12,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "%time df.passenger_count.sum().compute()"
    ]
    }
    ],
    "metadata": {
    @@ -333,4 +556,4 @@
    },
    "nbformat": 4,
    "nbformat_minor": 2
    }
    }
  2. @mrocklin mrocklin created this gist Jan 13, 2019.
    336 changes: 336 additions & 0 deletions nyc-taxi.ipynb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,336 @@
    {
    "cells": [
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "# Dask DataFrame and cuDF on NYC Taxi CSV data"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Start Dask Cluster on an Eight-GPU DGX Machine"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 1,
    "metadata": {},
    "outputs": [
    {
    "data": {
    "text/html": [
    "<table style=\"border: 2px solid white;\">\n",
    "<tr>\n",
    "<td style=\"vertical-align: top; border: 0px solid white\">\n",
    "<h3>Client</h3>\n",
    "<ul>\n",
    " <li><b>Scheduler: </b>tcp://127.0.0.1:33058\n",
    " <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
    "</ul>\n",
    "</td>\n",
    "<td style=\"vertical-align: top; border: 0px solid white\">\n",
    "<h3>Cluster</h3>\n",
    "<ul>\n",
    " <li><b>Workers: </b>8</li>\n",
    " <li><b>Cores: </b>8</li>\n",
    " <li><b>Memory: </b>540.96 GB</li>\n",
    "</ul>\n",
    "</td>\n",
    "</tr>\n",
    "</table>"
    ],
    "text/plain": [
    "<Client: scheduler='tcp://127.0.0.1:33058' processes=8 cores=8>"
    ]
    },
    "execution_count": 1,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "from dask_cuda import LocalCUDACluster\n",
    "cluster = LocalCUDACluster()\n",
    "\n",
    "from dask.distributed import Client\n",
    "client = Client(cluster)\n",
    "client"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Previously we ran this to shard the files more finely for cudf.read_csv\n",
    "\n",
    "```python\n",
    "import dask.dataframe as dd\n",
    "pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',\n",
    " parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n",
    "\n",
    "pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)\n",
    "```"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Read CSV files into Dask-GPU-DataFrame"
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 2,
    "metadata": {},
    "outputs": [
    {
    "data": {
    "text/html": [
    "<div>\n",
    "<style>\n",
    " .dataframe thead tr:only-child th {\n",
    " text-align: right;\n",
    " }\n",
    "\n",
    " .dataframe thead th {\n",
    " text-align: left;\n",
    " }\n",
    "\n",
    " .dataframe tbody tr th {\n",
    " vertical-align: top;\n",
    " }\n",
    "</style>\n",
    "<table border=\"1\" class=\"dataframe\">\n",
    " <thead>\n",
    " <tr style=\"text-align: right;\">\n",
    " <th></th>\n",
    " <th>VendorID</th>\n",
    " <th>tpep_pickup_datetime</th>\n",
    " <th>tpep_dropoff_datetime</th>\n",
    " <th>passenger_count</th>\n",
    " <th>trip_distance</th>\n",
    " <th>RatecodeID</th>\n",
    " <th>store_and_fwd_flag</th>\n",
    " <th>PULocationID</th>\n",
    " <th>DOLocationID</th>\n",
    " <th>payment_type</th>\n",
    " <th>fare_amount</th>\n",
    " <th>extra</th>\n",
    " <th>mta_tax</th>\n",
    " <th>tip_amount</th>\n",
    " <th>tolls_amount</th>\n",
    " <th>improvement_surcharge</th>\n",
    " <th>total_amount</th>\n",
    " </tr>\n",
    " </thead>\n",
    " <tbody>\n",
    " <tr>\n",
    " <th>0</th>\n",
    " <td>1</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1</td>\n",
    " <td>3.30</td>\n",
    " <td>1</td>\n",
    " <td>-2049400382</td>\n",
    " <td>263</td>\n",
    " <td>161</td>\n",
    " <td>1</td>\n",
    " <td>12.5</td>\n",
    " <td>0.0</td>\n",
    " <td>0.5</td>\n",
    " <td>2.00</td>\n",
    " <td>0.0</td>\n",
    " <td>0.3</td>\n",
    " <td>15.30</td>\n",
    " </tr>\n",
    " <tr>\n",
    " <th>1</th>\n",
    " <td>1</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1</td>\n",
    " <td>0.90</td>\n",
    " <td>1</td>\n",
    " <td>-2049400382</td>\n",
    " <td>186</td>\n",
    " <td>234</td>\n",
    " <td>1</td>\n",
    " <td>5.0</td>\n",
    " <td>0.0</td>\n",
    " <td>0.5</td>\n",
    " <td>1.45</td>\n",
    " <td>0.0</td>\n",
    " <td>0.3</td>\n",
    " <td>7.25</td>\n",
    " </tr>\n",
    " <tr>\n",
    " <th>2</th>\n",
    " <td>1</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1</td>\n",
    " <td>1.10</td>\n",
    " <td>1</td>\n",
    " <td>-2049400382</td>\n",
    " <td>164</td>\n",
    " <td>161</td>\n",
    " <td>1</td>\n",
    " <td>5.5</td>\n",
    " <td>0.0</td>\n",
    " <td>0.5</td>\n",
    " <td>1.00</td>\n",
    " <td>0.0</td>\n",
    " <td>0.3</td>\n",
    " <td>7.30</td>\n",
    " </tr>\n",
    " <tr>\n",
    " <th>3</th>\n",
    " <td>1</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1</td>\n",
    " <td>1.10</td>\n",
    " <td>1</td>\n",
    " <td>-2049400382</td>\n",
    " <td>236</td>\n",
    " <td>75</td>\n",
    " <td>1</td>\n",
    " <td>6.0</td>\n",
    " <td>0.0</td>\n",
    " <td>0.5</td>\n",
    " <td>1.70</td>\n",
    " <td>0.0</td>\n",
    " <td>0.3</td>\n",
    " <td>8.50</td>\n",
    " </tr>\n",
    " <tr>\n",
    " <th>4</th>\n",
    " <td>2</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1969-12-31 23:59:59</td>\n",
    " <td>1</td>\n",
    " <td>0.02</td>\n",
    " <td>2</td>\n",
    " <td>-2049400382</td>\n",
    " <td>249</td>\n",
    " <td>234</td>\n",
    " <td>2</td>\n",
    " <td>52.0</td>\n",
    " <td>0.0</td>\n",
    " <td>0.5</td>\n",
    " <td>0.00</td>\n",
    " <td>0.0</td>\n",
    " <td>0.3</td>\n",
    " <td>52.80</td>\n",
    " </tr>\n",
    " </tbody>\n",
    "</table>\n",
    "</div>"
    ],
    "text/plain": [
    " VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
    "0 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
    "1 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
    "2 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
    "3 1 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
    "4 2 1969-12-31 23:59:59 1969-12-31 23:59:59 1 \n",
    "\n",
    " trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
    "0 3.30 1 -2049400382 263 161 \n",
    "1 0.90 1 -2049400382 186 234 \n",
    "2 1.10 1 -2049400382 164 161 \n",
    "3 1.10 1 -2049400382 236 75 \n",
    "4 0.02 2 -2049400382 249 234 \n",
    "\n",
    " payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
    "0 1 12.5 0.0 0.5 2.00 0.0 \n",
    "1 1 5.0 0.0 0.5 1.45 0.0 \n",
    "2 1 5.5 0.0 0.5 1.00 0.0 \n",
    "3 1 6.0 0.0 0.5 1.70 0.0 \n",
    "4 2 52.0 0.0 0.5 0.00 0.0 \n",
    "\n",
    " improvement_surcharge total_amount \n",
    "0 0.3 15.30 \n",
    "1 0.3 7.25 \n",
    "2 0.3 7.30 \n",
    "3 0.3 8.50 \n",
    "4 0.3 52.80 "
    ]
    },
    "execution_count": 2,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "import dask_cudf\n",
    "\n",
    "gdf = dask_cudf.read_csv('data/nyc/many/*.csv')\n",
    "gdf.head().to_pandas()"
    ]
    },
    {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
    "### Time a full-pass computation\n",
    "\n",
    "Most of the time here is spent reading data from disk and parsing it."
    ]
    },
    {
    "cell_type": "code",
    "execution_count": 4,
    "metadata": {},
    "outputs": [
    {
    "name": "stdout",
    "output_type": "stream",
    "text": [
    "CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s\n",
    "Wall time: 4.68 s\n"
    ]
    },
    {
    "data": {
    "text/plain": [
    "184464740"
    ]
    },
    "execution_count": 4,
    "metadata": {},
    "output_type": "execute_result"
    }
    ],
    "source": [
    "%time gdf.passenger_count.sum().compute()"
    ]
    }
    ],
    "metadata": {
    "kernelspec": {
    "display_name": "Python [conda env:cudf_dev]",
    "language": "python",
    "name": "conda-env-cudf_dev-py"
    },
    "language_info": {
    "codemirror_mode": {
    "name": "ipython",
    "version": 3
    },
    "file_extension": ".py",
    "mimetype": "text/x-python",
    "name": "python",
    "nbconvert_exporter": "python",
    "pygments_lexer": "ipython3",
    "version": "3.5.5"
    }
    },
    "nbformat": 4,
    "nbformat_minor": 2
    }