Skip to content

Instantly share code, notes, and snippets.

@smurfix
Last active January 3, 2025 07:36
Show Gist options
  • Select an option

  • Save smurfix/d528056259496d1b7ada1c20b9aee6fb to your computer and use it in GitHub Desktop.

Select an option

Save smurfix/d528056259496d1b7ada1c20b9aee6fb to your computer and use it in GitHub Desktop.

Revisions

  1. smurfix revised this gist Feb 9, 2020. 1 changed file with 6 additions and 2 deletions.
    8 changes: 6 additions & 2 deletions result_taskgroup.py
    Original file line number Diff line number Diff line change
    @@ -13,8 +13,12 @@ async def __aenter__(self):
    await tg.__aenter__()
    return self

    def __aexit__(self, *tb):
    return self._taskgroup.__aexit__(*tb)
    async def __aexit__(self, *tb):
    try:
    res = await self._taskgroup.__aexit__(*tb)
    return res
    finally:
    del self._taskgroup

    async def _run_one(self, pos, proc, a):
    self.result[pos] = await proc(*a)
  2. smurfix created this gist Feb 9, 2020.
    45 changes: 45 additions & 0 deletions result_taskgroup.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    import anyio
    from contextlib import asynccontextmanager

    class NotYet(RuntimeError):
    pass

    class ResultGatheringTaskgroup:
    def __init__(self):
    self.result = []

    async def __aenter__(self):
    self._taskgroup = tg = anyio.create_task_group()
    await tg.__aenter__()
    return self

    def __aexit__(self, *tb):
    return self._taskgroup.__aexit__(*tb)

    async def _run_one(self, pos, proc, a):
    self.result[pos] = await proc(*a)

    async def spawn(self, proc, *a):
    pos = len(self.result)
    self.result.append(NotYet)
    await self._taskgroup.spawn(self._run_one, pos, proc, a)
    return pos

    def get_result(self, pos):
    res = self.result[pos]
    if res is NotYet:
    raise NotYet()
    return res

    if __name__ == "__main__":
    async def test():
    async def s(x):
    await anyio.sleep(x)
    return 3*x
    async with ResultGatheringTaskgroup() as rg:
    a = await rg.spawn(s,0.125)
    b = await rg.spawn(s,0.25)
    assert rg.get_result(a) == 0.375
    assert rg.get_result(b) == 0.75
    assert rg.result == [0.375,0.75]
    anyio.run(test)