Skip to content

Instantly share code, notes, and snippets.

@lsevero
Forked from ddelange/executor.py
Created September 23, 2021 02:38
Show Gist options
  • Select an option

  • Save lsevero/2e63179a4e1a832b16e3e1227234e6ee to your computer and use it in GitHub Desktop.

Select an option

Save lsevero/2e63179a4e1a832b16e3e1227234e6ee to your computer and use it in GitHub Desktop.

Revisions

  1. @ddelange ddelange revised this gist May 11, 2021. 1 changed file with 32 additions and 8 deletions.
    40 changes: 32 additions & 8 deletions run_in_executor.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,6 @@
    import asyncio
    from functools import partial
    from functools import wraps, partial


    def run_in_executor(fn=None, *, executor=None):
    """Make a sync function async. By default uses ThreadPoolExecutor."""
    @@ -11,29 +12,52 @@ def run_in_executor(fn=None, *, executor=None):
    @wraps(fn)
    async def wrapped(*args, **kwargs):
    """Wrap function in a run_in_executor."""
    loop = asyncio.get_running_loop()

    _fn = functools.partial(fn, *args, **kwargs)
    _fn = partial(fn, *args, **kwargs)

    fut = loop.run_in_executor(executor, _fn)
    if hasattr(executor, "coro_apply"):
    # support aioprocessing.pool.AioPool
    fut = executor.coro_apply(_fn)
    else:
    fut = asyncio.get_running_loop().run_in_executor(executor, _fn)

    return await fut

    return wrapped


    # without brackets
    @run_in_executor
    def test1():
    print(1)

    await test1()


    # with brackets
    @run_in_executor()
    def test2():
    print(2)

    await test2()

    # from concurrent.futures import ThreadPoolExecutor
    @run_in_executor(executor=ThreadPoolExecutor(1))

    # with explicit ThreadPoolExecutor
    from concurrent.futures import ThreadPoolExecutor

    @run_in_executor(executor=ThreadPoolExecutor(4))
    def test3():
    print(3)
    await test3()

    await test3()


    # with explicit AioPool, patched using multiprocess (uses dill for universal pickling)
    from aioprocessing.pool import AioPool
    from multiprocess import Pool
    setattr(AioPool, "delegate", Pool)

    @run_in_executor(executor=AioPool(4))
    def test4():
    print(4)

    await test4()
  2. @ddelange ddelange created this gist Apr 27, 2021.
    39 changes: 39 additions & 0 deletions run_in_executor.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,39 @@
    import asyncio
    from functools import partial

    def run_in_executor(fn=None, *, executor=None):
    """Make a sync function async. By default uses ThreadPoolExecutor."""
    if fn is None:
    # allow using this decorator with brackets, e.g.
    # @run_in_executor(executor=ThreadPoolExecutor(1))
    return partial(run_in_executor, executor=executor)

    @wraps(fn)
    async def wrapped(*args, **kwargs):
    """Wrap function in a run_in_executor."""
    loop = asyncio.get_running_loop()

    _fn = functools.partial(fn, *args, **kwargs)

    fut = loop.run_in_executor(executor, _fn)

    return await fut

    return wrapped


    @run_in_executor
    def test1():
    print(1)
    await test1()

    @run_in_executor()
    def test2():
    print(2)
    await test2()

    # from concurrent.futures import ThreadPoolExecutor
    @run_in_executor(executor=ThreadPoolExecutor(1))
    def test3():
    print(3)
    await test3()