Skip to content

Instantly share code, notes, and snippets.

@gforsyth
Created February 19, 2021 21:55
Show Gist options
  • Save gforsyth/f3425d6bd37e6f0b68ba46dbf8121efd to your computer and use it in GitHub Desktop.
Save gforsyth/f3425d6bd37e6f0b68ba46dbf8121efd to your computer and use it in GitHub Desktop.

Revisions

  1. gforsyth created this gist Feb 19, 2021.
    146 changes: 146 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,146 @@
    distributed.worker - WARNING - Compute Failed
    Function: subgraph_callable
    args: ( id name x y
    timestamp
    2000-01-01 00:00:00 996 George 0.828987 -0.929548
    2000-01-01 00:00:01 1000 Jerry -0.110702 0.714628
    2000-01-01 00:00:02 1018 Wendy 0.626610 -0.723174
    2000-01-01 00:00:03 998 Norbert -0.249345 0.829807
    2000-01-01 00:00:04 983 Patricia -0.614237 0.045530
    ... ... ... ... ...
    2000-01-01 23:59:55 968 George 0.607650 -0.601929
    2000-01-01 23:59:56 1015 Ursula -0.131636 -0.947113
    2000-01-01 23:59:57 997 Quinn 0.864542 0.200530
    2000-01-01 23:59:58 990 Patricia 0.675835 -0.233165
    2000-01-01 23:59:59 952 Charlie 0.656843 0.612066

    [86400 rows x 4 columns], "('x',)")
    kwargs: {}
    Exception: KeyError("('x',)")

    ---------------------------------------------------------------------------
    KeyError Traceback (most recent call last)
    <ipython-input-6-4e0da72f8a21> in <module>
    ----> 1 ddf.head()

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
    1040 Whether to compute the result, default is True.
    1041 """
    -> 1042 return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
    1043
    1044 def _head(self, n, npartitions, compute, safe):

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
    1073
    1074 if compute:
    -> 1075 result = result.compute()
    1076 return result
    1077

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    279 dask.base.compute
    280 """
    --> 281 (result,) = compute(self, traverse=False, **kwargs)
    282 return result
    283

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    561 postcomputes.append(x.__dask_postcompute__())
    562
    --> 563 results = schedule(dsk, keys, **kwargs)
    564 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    565

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
    2653 should_rejoin = False
    2654 try:
    -> 2655 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
    2656 finally:
    2657 for f in futures.values():

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
    1962 else:
    1963 local_worker = None
    -> 1964 return self.sync(
    1965 self._gather,
    1966 futures,

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    836 return future
    837 else:
    --> 838 return sync(
    839 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    840 )

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338 if error[0]:
    339 typ, exc, tb = error[0]
    --> 340 raise exc.with_traceback(tb)
    341 else:
    342 return result[0]

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/utils.py in f()
    322 if callback_timeout is not None:
    323 future = asyncio.wait_for(future, callback_timeout)
    --> 324 result[0] = yield future
    325 except Exception as exc:
    326 error[0] = sys.exc_info()

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760
    761 try:
    --> 762 value = future.result()
    763 except Exception:
    764 exc_info = sys.exc_info()

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
    1827 exc = CancelledError(key)
    1828 else:
    -> 1829 raise exception.with_traceback(traceback)
    1830 raise exc
    1831 if errors == "skip":

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/optimization.py in __call__()
    961 if not len(args) == len(self.inkeys):
    962 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
    --> 963 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    964
    965 def __reduce__(self):

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/core.py in get()
    149 for key in toposort(dsk):
    150 task = dsk[key]
    --> 151 result = _execute_task(task, cache)
    152 cache[key] = result
    153 result = _execute_task(out, cache)

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119 # temporaries by their reference count and can execute certain
    120 # operations in-place.
    --> 121 return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123 return arg

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/pandas/core/frame.py in __getitem__()
    2798 if self.columns.nlevels > 1:
    2799 return self._getitem_multilevel(key)
    -> 2800 indexer = self.columns.get_loc(key)
    2801 if is_integer(indexer):
    2802 indexer = [indexer]

    ~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/pandas/core/indexes/base.py in get_loc()
    2646 return self._engine.get_loc(key)
    2647 except KeyError:
    -> 2648 return self._engine.get_loc(self._maybe_cast_indexer(key))
    2649 indexer = self.get_indexer([key], method=method, tolerance=tolerance)
    2650 if indexer.ndim > 1 or indexer.size > 1:

    pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

    pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

    pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

    pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

    KeyError: "('x',)"