Created
December 22, 2015 23:13
-
-
Save prismv/ddaa3ec394d5c6f8b578 to your computer and use it in GitHub Desktop.
Revisions
-
prismv created this gist
Dec 22, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,70 @@ # coding: utf-8 import sys, traceback import numpy as np import pandas as pd import bcolz import dask.dataframe as dd OK = '_OKAY_' def doit(df_dd): try: out = df_dd.compute() print(OK) except: traceback.print_exc(file=sys.stdout) # print('\n# data read from bcolz') ctable = bcolz.open('test.bcolz', mode='r') df_dd = dd.from_bcolz(ctable, chunksize=int(1E6)) z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1) z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1) df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1) doit(df_dd_out) # print('\n# data read from memory, npartitions>1') ctable_mem = ctable[:int(1E6)] df_dd_mem = dd.from_pandas(pd.DataFrame.from_records(ctable_mem), npartitions=10) z1m = (df_dd_mem[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1) z2m = (df_dd_mem[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1) df_dd_out_mem = dd.concat([z1m.to_frame('z1'), z2m.to_frame('z2')], axis=1) doit(df_dd_out_mem) # print('\n# data read from memory, larger extent, npartitions>1') ctable_mem_b = ctable[:int(1E7)] # larger df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b), npartitions=10) z1m_b = (df_dd_mem_b[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1) z2m_b = (df_dd_mem_b[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1) df_dd_out_mem_b = dd.concat([z1m_b.to_frame('z1'), z2m_b.to_frame('z2')], axis=1) doit(df_dd_out_mem_b) # print('\n# data read from memory, npartitions=1') df_dd_mem_c = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b), npartitions=1) z1m_c = (df_dd_mem_c[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1) z2m_c = (df_dd_mem_c[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1) df_dd_out_mem_c = dd.concat([z1m_c.to_frame('z1'), z2m_c.to_frame('z2')], axis=1) doit(df_dd_out_mem_c) # check also get_division df_dd_mem.get_division(0).head() # ok z1m_b.get_division(0).head() # ok df_dd_out_mem_b.get_division(9).head() # last division works df_dd_out_mem_b.get_division(0).head() # other divisions raise same dask.async.IndexError