import dask import dask.array as da import dask.dataframe as dd import sparse @dask.delayed(pure=True) def corr_on_chunked(chunk1, chunk2, corr_thresh=0.9): return sparse.COO.from_numpy((np.dot(chunk1, chunk2.T) > corr_thresh)) def chunked_corr_sparse_dask(data, chunksize=5000, corr_thresh=0.9): # Gets the correlation of a large DataFrame, chunking the computation # Returns a sparse directed adjancy matrix (old->young) # Adapted from https://stackoverflow.com/questions/24717513/python-numpy-corrcoef-memory-error numrows = data.shape[0] data -= np.mean(data, axis=1)[:,None] # subtract means form the input data data /= np.sqrt(np.sum(data**2, axis=1))[:,None] # normalize the data rows = [] for r in range(0, numrows, chunksize): cols = [] for c in range(0, numrows, chunksize): r1 = r + chunksize c1 = c + chunksize chunk1 = data[r:r1] chunk2 = data[c:c1] delayed_array = corr_on_chunked(chunk1, chunk2, corr_thresh=corr_thresh) cols.append(da.from_delayed( delayed_array, dtype='bool', shape=(chunksize, chunksize), )) rows.append(da.hstack(cols)) res = da.vstack(rows).compute() res = sparse.triu(res, k=1) return res.tocsr()