Skip to content

Instantly share code, notes, and snippets.

@prismv
Created December 22, 2015 23:13
Show Gist options
  • Save prismv/ddaa3ec394d5c6f8b578 to your computer and use it in GitHub Desktop.
Save prismv/ddaa3ec394d5c6f8b578 to your computer and use it in GitHub Desktop.

Revisions

  1. prismv created this gist Dec 22, 2015.
    70 changes: 70 additions & 0 deletions _test_dask_error.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    # coding: utf-8
    import sys, traceback
    import numpy as np
    import pandas as pd
    import bcolz
    import dask.dataframe as dd

    OK = '_OKAY_'

    def doit(df_dd):
    try:
    out = df_dd.compute()
    print(OK)
    except:
    traceback.print_exc(file=sys.stdout)

    #
    print('\n# data read from bcolz')

    ctable = bcolz.open('test.bcolz', mode='r')
    df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))

    z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
    z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)

    df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)

    doit(df_dd_out)

    #
    print('\n# data read from memory, npartitions>1')

    ctable_mem = ctable[:int(1E6)]
    df_dd_mem = dd.from_pandas(pd.DataFrame.from_records(ctable_mem), npartitions=10)

    z1m = (df_dd_mem[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
    z2m = (df_dd_mem[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)

    df_dd_out_mem = dd.concat([z1m.to_frame('z1'), z2m.to_frame('z2')], axis=1)

    doit(df_dd_out_mem)

    #
    print('\n# data read from memory, larger extent, npartitions>1')
    ctable_mem_b = ctable[:int(1E7)] # larger
    df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b), npartitions=10)

    z1m_b = (df_dd_mem_b[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
    z2m_b = (df_dd_mem_b[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)

    df_dd_out_mem_b = dd.concat([z1m_b.to_frame('z1'), z2m_b.to_frame('z2')], axis=1)

    doit(df_dd_out_mem_b)

    #
    print('\n# data read from memory, npartitions=1')
    df_dd_mem_c = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b), npartitions=1)

    z1m_c = (df_dd_mem_c[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
    z2m_c = (df_dd_mem_c[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)

    df_dd_out_mem_c = dd.concat([z1m_c.to_frame('z1'), z2m_c.to_frame('z2')], axis=1)

    doit(df_dd_out_mem_c)

    # check also get_division
    df_dd_mem.get_division(0).head() # ok
    z1m_b.get_division(0).head() # ok
    df_dd_out_mem_b.get_division(9).head() # last division works
    df_dd_out_mem_b.get_division(0).head() # other divisions raise same dask.async.IndexError