# Dask DataFrame and cuDF on NYC Taxi CSV data

### Start Dask Cluster on an Eight-GPU DGX Machine

In [1]:
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()

from dask.distributed import Client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:33058  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 540.96 GB


### Previously we ran this to shard the files more finely for cudf.read_csv

```python
import dask.dataframe as dd
pdf = dd.read_csv('data/nyc/yellow_tripdata_2017-*.csv',
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

pdf.repartition(npartitions=100).to_csv('data/nyc/many/*.csv', index=False)
```

### Read CSV files into Dask-GPU-DataFrame

In [2]:
import dask_cudf

gdf = dask_cudf.read_csv('data/nyc/many/*.csv')
gdf.head().to_pandas()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,1969-12-31 23:59:59,1969-12-31 23:59:59,1,3.3,1,-2049400382,263,161,1,12.5,0.0,0.5,2.0,0.0,0.3,15.3
1,1,1969-12-31 23:59:59,1969-12-31 23:59:59,1,0.9,1,-2049400382,186,234,1,5.0,0.0,0.5,1.45,0.0,0.3,7.25
2,1,1969-12-31 23:59:59,1969-12-31 23:59:59,1,1.1,1,-2049400382,164,161,1,5.5,0.0,0.5,1.0,0.0,0.3,7.3
3,1,1969-12-31 23:59:59,1969-12-31 23:59:59,1,1.1,1,-2049400382,236,75,1,6.0,0.0,0.5,1.7,0.0,0.3,8.5
4,2,1969-12-31 23:59:59,1969-12-31 23:59:59,1,0.02,2,-2049400382,249,234,2,52.0,0.0,0.5,0.0,0.0,0.3,52.8


### Time a full-pass computation

Most of the time here is spent reading data from disk and parsing it.

In [4]:
%time gdf.passenger_count.sum().compute()

CPU times: user 1.16 s, sys: 100 ms, total: 1.26 s
Wall time: 4.68 s


184464740

### Single GPU

In [6]:
%time gdf.passenger_count.sum().compute(scheduler='single-threaded')

CPU times: user 7.5 s, sys: 4.66 s, total: 12.2 s
Wall time: 10.9 s


184464740

### Single CPU

In [7]:
import dask.dataframe as dd

df = dd.read_csv('data/nyc/many/*.csv')
type(df.head())

pandas.core.frame.DataFrame

In [8]:
%time df.passenger_count.sum().compute(scheduler='single-threaded')

CPU times: user 34min 9s, sys: 21.5 s, total: 34min 30s
Wall time: 3min 14s


184464740

### Eight CPUs, one per process

In [9]:
%time df.passenger_count.sum().compute()

CPU times: user 10.8 s, sys: 988 ms, total: 11.8 s
Wall time: 57.5 s


184464740

### Eighty CPUs with a balance of threads and processes

In [10]:
client.close()
cluster.close()



In [11]:
client = Client(n_workers=10, threads_per_worker=8)
client

0,1
Client  Scheduler: tcp://127.0.0.1:45873  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 10  Cores: 80  Memory: 540.96 GB


In [12]:
%time df.passenger_count.sum().compute()

CPU times: user 7.55 s, sys: 692 ms, total: 8.24 s
Wall time: 34.9 s


184464740