Skip to content

Instantly share code, notes, and snippets.

@frostming
Last active June 16, 2024 18:17
Show Gist options
  • Select an option

  • Save frostming/c9ef1c74ca28cf2f8fca3d4d15e9e245 to your computer and use it in GitHub Desktop.

Select an option

Save frostming/c9ef1c74ca28cf2f8fca3d4d15e9e245 to your computer and use it in GitHub Desktop.

Revisions

  1. frostming revised this gist Dec 10, 2020. 1 changed file with 174 additions and 31 deletions.
    205 changes: 174 additions & 31 deletions promise.py
    Original file line number Diff line number Diff line change
    @@ -2,12 +2,17 @@
    Python implementation of JavaScript's Promise interface
    with the power of asyncio.
    See https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise
    for the API specification.
    Authored by: Frost Ming <[email protected]>
    License: WTFPL
    """

    import asyncio
    from typing import Any, Callable, Optional
    import functools
    import time
    from typing import Any, Callable, Iterable, Optional


    ResolveFunc = Callable[[Any], Any]
    @@ -16,19 +21,20 @@
    PromiseCallBack = Callable[[ResolveFunc, RejectFunc], Any]


    def _noop_callback(resolve_func, reject_func):
    """This is a dummy callback that doesn't resolve anything nor reject.
    For .then() and .catch() chaining Promises.
    """
    pass
    class AggregateError(Exception):
    def __init__(self, errors: Iterable[Exception]) -> None:
    self.errors = list(errors)


    class Promise:
    """A class that implements the JavaScript's Promise interface, with asyncio!"""

    def __init__(self, callback: PromiseCallBack) -> None:
    self.callback = callback
    self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
    try:
    self._loop = asyncio.get_event_loop()
    except RuntimeError:
    self._loop = asyncio.new_event_loop()
    # Commit the task to the event loop immediately after Promise is created
    self._future = self.__run()

    @@ -49,12 +55,28 @@ def __await__(self):
    return result

    def __run(self):
    def _resolve(r):
    def _handle():
    if not future.done():
    future.set_result(r)

    self._loop.call_soon_threadsafe(_handle)

    def _reject(e):
    def _handle():
    if not future.done():
    future.set_exception(e)

    self._loop.call_soon_threadsafe(_handle)

    future = self._loop.create_future()
    self._loop.call_soon_threadsafe(
    self.callback,
    lambda r: future.set_result(r),
    lambda e: future.set_exception(e),
    )
    handle = self._loop.run_in_executor(None, self.callback, _resolve, _reject)

    def _check_cancel(target):
    if target.cancelled():
    handle.get_loop().call_soon_threadsafe(handle.cancel)

    future.add_done_callback(_check_cancel)
    return future

    @classmethod
    @@ -71,62 +93,170 @@ def callback(_, reject_func):

    return cls(callback)

    def __chain_promise(self) -> "Promise":
    """Create a new promise, chaining the status of current promise."""
    this_future = self._future

    def _noop_callback(resolve_func, reject_func):
    pass

    def _check_cancel(future):
    source_loop, other_loop = future.get_loop(), self._loop
    if future.cancelled():
    if source_loop is other_loop:
    this_future.cancel()
    else:
    other_loop.call_soon_threadsafe(this_future.cancel)

    new_promise = Promise(_noop_callback)
    new_promise._future.add_done_callback(functools.partial(_check_cancel))
    return new_promise

    def then(
    self, resolve_func: ResolveFunc, reject_func: Optional[RejectFunc] = None
    ) -> "Promise":
    new_promise = Promise(_noop_callback)
    new_promise = self.__chain_promise()
    new_future = new_promise._future
    dest_loop = new_future.get_loop()

    def receive_result(future: asyncio.Future):
    if future.exception():
    if future.cancelled():
    return
    elif future.exception():
    if reject_func is None:
    new_future.set_exception(future.exception())
    dest_loop.call_soon_threadsafe(
    new_future.set_exception, future.exception()
    )
    else:
    try:
    result = reject_func(future.exception())
    except Exception as e:
    new_future.set_exception(e)
    dest_loop.call_soon_threadsafe(new_future.set_exception, e)
    else:
    new_future.set_result(result)
    dest_loop.call_soon_threadsafe(new_future.set_result, result)
    else:
    result = resolve_func(future.result())
    new_future.set_result(result)
    dest_loop.call_soon_threadsafe(new_future.set_result, result)

    self._future.add_done_callback(receive_result)
    if not self._future.done():
    self._future.add_done_callback(receive_result)
    else:
    receive_result(self._future)
    return new_promise

    def catch(self, callback: RejectFunc) -> "Promise":
    new_promise = Promise(_noop_callback)
    new_promise = self.__chain_promise()
    new_future = new_promise._future
    dest_loop = new_future.get_loop()

    def receive_result(future: asyncio.Future):
    if future.exception():
    if future.cancelled():
    return
    elif future.exception():
    try:
    result = callback(future.exception())
    except Exception as e:
    new_future.set_exception(e)
    dest_loop.call_soon_threadsafe(new_future.set_exception, e)
    else:
    new_future.set_result(result)
    dest_loop.call_soon_threadsafe(new_future.set_result, result)
    else:
    new_future.set_result(future.result())
    dest_loop.call_soon_threadsafe(new_future.set_result, future.result())

    self._future.add_done_callback(receive_result)
    if not self._future.done():
    self._future.add_done_callback(receive_result)
    else:
    receive_result(self._future)
    return new_promise

    def final(self, callback: FinallyFunc) -> "Promise":
    new_promise = Promise(_noop_callback)
    new_promise = self.__chain_promise()
    new_future = new_promise._future
    dest_loop = new_future.get_loop()

    def receive_result(future: asyncio.Future):
    if future.exception():
    new_future.set_exception(future.exception())
    if future.cancelled():
    return
    elif future.exception():
    dest_loop.call_soon_threadsafe(
    new_future.set_result, future.exception()
    )
    else:
    new_future.set_result(future.result())
    dest_loop.call_soon_threadsafe(new_future.set_result, future.result())
    callback()

    self._future.add_done_callback(receive_result)
    if not self._future.done():
    self._future.add_done_callback(receive_result)
    else:
    receive_result(self._future)
    return new_promise

    @classmethod
    def all(cls, promises: Iterable["Promise"]) -> "Promise":
    promises = list(promises)

    def callback(resolve, reject):
    def on_resolve(_):
    if any(not p._future.done() or p._future.exception() for p in promises):
    return
    resolve([p._future.result() for p in promises])

    def on_reject(err):
    for p in promises:
    if not p._future.done():
    p._future.get_loop().call_soon_threadsafe(p._future.cancel)
    reject(err)

    for p in promises:
    p.then(on_resolve, on_reject)

    return cls(callback)

    @classmethod
    def any(cls, promises: Iterable["Promise"]) -> "Promise":
    promises = list(promises)

    def callback(resolve, reject):
    def on_resolve(r):
    for p in promises:
    if not p._future.done():
    p._future.get_loop().call_soon_threadsafe(p._future.cancel)
    resolve(r)

    def on_reject(err):
    if any(
    not p._future.done() or not p._future.exception() for p in promises
    ):
    return
    errors = filter(None, (p._future.exception() for p in promises))
    reject(AggregateError(errors))

    for p in promises:
    p.then(on_resolve, on_reject)

    return cls(callback)

    @classmethod
    def race(cls, promises: Iterable["Promise"]) -> "Promise":
    promises = list(promises)

    def callback(resolve, reject):
    def on_resolve(r):
    for p in promises:
    if not p._future.done():
    p._future.get_loop().call_soon_threadsafe(p._future.cancel)
    resolve(r)

    def on_reject(err):
    for p in promises:
    if not p._future.done():
    p._future.get_loop().call_soon_threadsafe(p._future.cancel)
    reject(err)

    for p in promises:
    p.then(on_resolve, on_reject)

    return cls(callback)


    def is_even(number):
    def callback(resolve, reject):
    @@ -151,5 +281,18 @@ async def test():
    assert await is_even(4).then(str).then(lambda s: s.upper()) == "TRUE"


    async def main():

    promises = [Promise.resolve("foo")]

    def callback(resolve, reject):
    time.sleep(1)
    resolve("bar")

    promises.append(Promise(callback))
    print(await Promise.race(promises))


    if __name__ == "__main__":
    asyncio.run(test())
    # asyncio.run(test())
    asyncio.run(main())
  2. frostming created this gist Dec 10, 2020.
    155 changes: 155 additions & 0 deletions promise.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,155 @@
    """
    Python implementation of JavaScript's Promise interface
    with the power of asyncio.
    Authored by: Frost Ming <[email protected]>
    License: WTFPL
    """

    import asyncio
    from typing import Any, Callable, Optional


    ResolveFunc = Callable[[Any], Any]
    RejectFunc = Callable[[Exception], Any]
    FinallyFunc = Callable[[], Any]
    PromiseCallBack = Callable[[ResolveFunc, RejectFunc], Any]


    def _noop_callback(resolve_func, reject_func):
    """This is a dummy callback that doesn't resolve anything nor reject.
    For .then() and .catch() chaining Promises.
    """
    pass


    class Promise:
    """A class that implements the JavaScript's Promise interface, with asyncio!"""

    def __init__(self, callback: PromiseCallBack) -> None:
    self.callback = callback
    self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
    # Commit the task to the event loop immediately after Promise is created
    self._future = self.__run()

    def __repr__(self) -> str:
    try:
    result = f" {self._future.result()!r}"
    except Exception:
    result = ""
    return f"<{self.__class__.__name__}[{self._future._state}]{result}>"

    def __await__(self):
    yield from self._future
    result = self._future.result()
    # Await all chaining promises
    if isinstance(result, Promise):
    return (yield from result.__await__())
    else:
    return result

    def __run(self):
    future = self._loop.create_future()
    self._loop.call_soon_threadsafe(
    self.callback,
    lambda r: future.set_result(r),
    lambda e: future.set_exception(e),
    )
    return future

    @classmethod
    def resolve(cls, result: Any) -> "Promise":
    def callback(resolve_func, _):
    resolve_func(result)

    return cls(callback)

    @classmethod
    def reject(cls, exc: Exception) -> "Promise":
    def callback(_, reject_func):
    reject_func(exc)

    return cls(callback)

    def then(
    self, resolve_func: ResolveFunc, reject_func: Optional[RejectFunc] = None
    ) -> "Promise":
    new_promise = Promise(_noop_callback)
    new_future = new_promise._future

    def receive_result(future: asyncio.Future):
    if future.exception():
    if reject_func is None:
    new_future.set_exception(future.exception())
    else:
    try:
    result = reject_func(future.exception())
    except Exception as e:
    new_future.set_exception(e)
    else:
    new_future.set_result(result)
    else:
    result = resolve_func(future.result())
    new_future.set_result(result)

    self._future.add_done_callback(receive_result)
    return new_promise

    def catch(self, callback: RejectFunc) -> "Promise":
    new_promise = Promise(_noop_callback)
    new_future = new_promise._future

    def receive_result(future: asyncio.Future):
    if future.exception():
    try:
    result = callback(future.exception())
    except Exception as e:
    new_future.set_exception(e)
    else:
    new_future.set_result(result)
    else:
    new_future.set_result(future.result())

    self._future.add_done_callback(receive_result)
    return new_promise

    def final(self, callback: FinallyFunc) -> "Promise":
    new_promise = Promise(_noop_callback)
    new_future = new_promise._future

    def receive_result(future: asyncio.Future):
    if future.exception():
    new_future.set_exception(future.exception())
    else:
    new_future.set_result(future.result())
    callback()

    self._future.add_done_callback(receive_result)
    return new_promise


    def is_even(number):
    def callback(resolve, reject):
    if not isinstance(number, int):
    reject(ValueError(f"Invalid number: {number!r}"))
    else:
    resolve(number % 2 == 0)

    return Promise(callback)


    async def test():
    assert await is_even(4)
    assert not (await is_even(5))
    err = None
    try:
    await is_even("4")
    except Exception as e:
    err = e
    assert str(err) == "Invalid number: '4'"

    assert await is_even(4).then(str).then(lambda s: s.upper()) == "TRUE"


    if __name__ == "__main__":
    asyncio.run(test())