Skip to content

Instantly share code, notes, and snippets.

@KubaO
Last active December 9, 2024 01:16
Show Gist options
  • Select an option

  • Save KubaO/7aa1f571c60e4bccdf683e339dbff9f4 to your computer and use it in GitHub Desktop.

Select an option

Save KubaO/7aa1f571c60e4bccdf683e339dbff9f4 to your computer and use it in GitHub Desktop.

Revisions

  1. KubaO revised this gist Sep 10, 2024. 1 changed file with 100 additions and 3 deletions.
    103 changes: 100 additions & 3 deletions winmsgasyncio.py
    Original file line number Diff line number Diff line change
    @@ -3,6 +3,8 @@
    Usage:
    asyncio.set_event_loop_policy(winmsgasyncio.MsgProactorEventLoopPolicy())
    To integerate with tkinter, use tk_sync_mainloop(root) or await tk_async_mainloop(root).
    """

    # SPDX-License-Identifier: MIT
    @@ -11,14 +13,16 @@
    'MsgProactorEventLoopPolicy',
    'MsgProactorEventLoop',
    'MsgIocpProactor',
    'tk_sync_mainloop',
    'tk_async_mainloop',
    )

    import asyncio
    import asyncio.events
    import ctypes
    import logging
    import math
    import sys
    # import sys
    import traceback
    from ctypes.wintypes import HANDLE, MSG, BOOL, LPMSG, HWND, UINT, LPARAM, DWORD, LPHANDLE
    from typing import Callable
    @@ -68,7 +72,8 @@ def __init__(self, *args, **kwargs):
    self._handles[0] = self._iocp
    self._dumpstack: bool = True
    self._counter = 0
    self.message_hook: Callable[[MSG], bool] | None = None
    self.pre_message_hook: Callable[[MSG], bool] | None = None
    self.post_message_hook: Callable[[MSG], None] | None = None

    def _poll(self, timeout=None):
    logger.debug("_poll enter")
    @@ -99,12 +104,16 @@ def _poll(self, timeout=None):
    status = GetQueuedCompletionStatus(self._iocp, ms)
    elif rv == WAIT_OBJECT_0 + 1:
    while rc := PeekMessageW(self._msg, 0, 0, 0, PM_REMOVE):
    if not self.message_hook or not self.message_hook(
    if not self.pre_message_hook or not self.pre_message_hook(
    self._msg):
    logger.debug(f"MSG=0x{self._msg.message:x}")
    TranslateMessage(self._msg)
    DispatchMessageW(self._msg)
    if self.post_message_hook:
    self.post_message_hook(self._msg)
    if self._msg.message == WM_QUIT:
    logger.debug("WM_QUIT!")
    print(id(asyncio.get_event_loop()))
    asyncio.get_event_loop().stop()
    elif rv == WAIT_TIMEOUT:
    logger.debug("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    @@ -163,9 +172,97 @@ def __init__(self, proactor=None):
    if proactor is None:
    proactor = MsgIocpProactor()
    super().__init__(proactor)
    self.Tk_GetNumMainWindows = None
    self._tk_initialized = False

    def _tk_init(self):
    if not self._tk_initialized:
    tkdll = ctypes.WinDLL("tk86t.dll")
    self.Tk_GetNumMainWindows = tkdll.Tk_GetNumMainWindows
    self._tk_initialized = True

    def tk_sync_mainloop(self, root):
    logger.info("tk_sync_mainloop")
    self._tk_init()

    def message_hook(msg) -> bool:
    while root.tk.dooneevent(_tkinter.DONT_WAIT):
    pass
    if self.Tk_GetNumMainWindows() == 0:
    self.stop()
    return False

    post_hook = self._proactor.post_message_hook
    self._proactor.post_message_hook = message_hook
    self.run_forever()
    self._proactor.post_message_hook = post_hook

    async def tk_async_mainloop(self, root):
    logger.info("tk_async_mainloop")
    self._tk_init()
    future = self.create_future()

    def post_message_hook(msg) -> bool:
    while root.tk.dooneevent(_tkinter.DONT_WAIT):
    pass
    if self.Tk_GetNumMainWindows() == 0:
    future.set_result(True)
    return True

    post_hook = self._proactor.post_message_hook

    def release_hook(msg):
    self._proactor.post_message_hook = post_hook

    self._proactor.post_message_hook = post_message_hook
    future.add_done_callback(release_hook)
    return await future


    class MsgProactorEventLoopPolicy(
    asyncio.events.BaseDefaultEventLoopPolicy):
    _loop_factory = MsgProactorEventLoop


    def tk_sync_mainloop(root, close=True):
    loop = asyncio.new_event_loop()
    loop.tk_sync_mainloop(root)
    if close:
    loop.close()


    async def tk_async_mainloop(root):
    await asyncio.get_running_loop().tk_async_mainloop(root)


    if __name__ == '__main__':
    from tkinter import Tk, Label, Button, _tkinter
    import logging
    import random

    logging.basicConfig(level=logging.INFO)
    asyncio.set_event_loop_policy(MsgProactorEventLoopPolicy())

    root = Tk()
    text = "This is Tcl/Tk %s" % root.globalgetvar('tk_patchLevel')
    text += "\nThis should be a cedilla: \xe7"
    label = Label(root, text=text)
    label.pack()
    test = Button(root, text="Click me!",
    command=lambda root=root: root.test.configure(
    text="[%s]" % root.test['text']))
    test.pack()
    root.test = test

    quit = Button(root, text="QUIT", command=root.destroy)
    quit.pack()

    root.iconify()
    root.update()
    root.deiconify()

    if bool(random.getrandbits(1)):
    tk_sync_mainloop(root)
    else:
    asyncio.run(tk_async_mainloop(root))

  2. KubaO revised this gist Sep 10, 2024. 1 changed file with 6 additions and 1 deletion.
    7 changes: 6 additions & 1 deletion winmsgasyncio.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,9 @@
    """An event loop policy that integrates Windows message loop with IOCP-based async processing."""
    """
    An event loop policy that integrates Windows message loop with IOCP-based async processing.
    Usage:
    asyncio.set_event_loop_policy(winmsgasyncio.MsgProactorEventLoopPolicy())
    """

    # SPDX-License-Identifier: MIT

  3. KubaO revised this gist Sep 10, 2024. 1 changed file with 8 additions and 4 deletions.
    12 changes: 8 additions & 4 deletions winmsgasyncio.py
    Original file line number Diff line number Diff line change
    @@ -4,6 +4,7 @@

    __all__ = (
    'MsgProactorEventLoopPolicy',
    'MsgProactorEventLoop',
    'MsgIocpProactor',
    )

    @@ -32,6 +33,7 @@

    LRESULT = LPARAM


    def _winfn(lib, name, *args):
    fun = getattr(lib, name)
    globals()[name] = ctypes.WINFUNCTYPE(*args)(fun)
    @@ -41,7 +43,8 @@ def _winfn(lib, name, *args):
    _winfn(_user32, 'PeekMessageW', BOOL, LPMSG, HWND, UINT, UINT, UINT)
    _winfn(_user32, 'TranslateMessage', BOOL, LPMSG)
    _winfn(_user32, 'DispatchMessageW', LRESULT, LPMSG)
    _winfn(_user32, 'MsgWaitForMultipleObjectsEx', DWORD, DWORD, LPHANDLE, DWORD, DWORD, DWORD)
    _winfn(_user32, 'MsgWaitForMultipleObjectsEx', DWORD,
    DWORD, LPHANDLE, DWORD, DWORD, DWORD)

    logger = logging.getLogger(__name__)

    @@ -54,12 +57,13 @@ def debug_stack(limit=None):

    class MsgIocpProactor(asyncio.IocpProactor):
    def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._msg = MSG()
    self._handles = (HANDLE * 1)()
    self._handles[0] = self._iocp
    self._dumpstack: bool = True
    self._counter = 0
    self.message_hook: Callable[[MSG], bool] | None = None
    super().__init__(*args, **kwargs)

    def _poll(self, timeout=None):
    logger.debug("_poll enter")
    @@ -79,7 +83,6 @@ def _poll(self, timeout=None):
    if ms >= INFINITE:
    raise ValueError("timeout too big")

    self._handles[0] = self._iocp
    while True:
    status = None
    rv = MsgWaitForMultipleObjectsEx(
    @@ -91,7 +94,8 @@ def _poll(self, timeout=None):
    status = GetQueuedCompletionStatus(self._iocp, ms)
    elif rv == WAIT_OBJECT_0 + 1:
    while rc := PeekMessageW(self._msg, 0, 0, 0, PM_REMOVE):
    if not self.message_hook or not self.message_hook(self._msg):
    if not self.message_hook or not self.message_hook(
    self._msg):
    TranslateMessage(self._msg)
    DispatchMessageW(self._msg)
    if self._msg.message == WM_QUIT:
  4. KubaO revised this gist Sep 10, 2024. 1 changed file with 26 additions and 13 deletions.
    39 changes: 26 additions & 13 deletions winmsgasyncio.py
    Original file line number Diff line number Diff line change
    @@ -14,9 +14,10 @@
    import math
    import sys
    import traceback
    from _winapi import INFINITE, CloseHandle
    from ctypes.wintypes import HANDLE, MSG
    from ctypes.wintypes import HANDLE, MSG, BOOL, LPMSG, HWND, UINT, LPARAM, DWORD, LPHANDLE
    from typing import Callable
    from _overlapped import INVALID_HANDLE_VALUE, GetQueuedCompletionStatus
    from _winapi import INFINITE, CloseHandle

    WAIT_OBJECT_0 = 0
    WAIT_IO_COMPLETION = 192
    @@ -29,11 +30,18 @@
    WS_OVERLAPPED = 0
    WM_QUIT = 18

    LRESULT = LPARAM

    def _winfn(lib, name, *args):
    fun = getattr(lib, name)
    globals()[name] = ctypes.WINFUNCTYPE(*args)(fun)


    _user32 = ctypes.WinDLL("USER32")
    PeekMessage = _user32.PeekMessageW
    TranslateMessage = _user32.TranslateMessage
    DispatchMessage = _user32.DispatchMessageW
    MsgWaitForMultipleObjectsEx = _user32.MsgWaitForMultipleObjectsEx
    _winfn(_user32, 'PeekMessageW', BOOL, LPMSG, HWND, UINT, UINT, UINT)
    _winfn(_user32, 'TranslateMessage', BOOL, LPMSG)
    _winfn(_user32, 'DispatchMessageW', LRESULT, LPMSG)
    _winfn(_user32, 'MsgWaitForMultipleObjectsEx', DWORD, DWORD, LPHANDLE, DWORD, DWORD, DWORD)

    logger = logging.getLogger(__name__)

    @@ -48,7 +56,9 @@ class MsgIocpProactor(asyncio.IocpProactor):
    def __init__(self, *args, **kwargs):
    self._msg = MSG()
    self._handles = (HANDLE * 1)()
    self._dumpstack = logger.getEffectiveLevel() <= logging.DEBUG
    self._dumpstack: bool = True
    self._counter = 0
    self.message_hook: Callable[[MSG], bool] | None = None
    super().__init__(*args, **kwargs)

    def _poll(self, timeout=None):
    @@ -74,16 +84,19 @@ def _poll(self, timeout=None):
    status = None
    rv = MsgWaitForMultipleObjectsEx(
    1, self._handles, ms, QS_ALLINPUT, MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    logger.debug(f"* {self._counter}")
    self._counter += 1
    if rv == WAIT_OBJECT_0 or rv == WAIT_IO_COMPLETION:
    logger.debug(f"MsgWaitForMultipleObjectsEx IOC {rv}")
    status = GetQueuedCompletionStatus(self._iocp, ms)
    elif rv == WAIT_OBJECT_0 + 1:
    while rc := PeekMessage(self._msg, 0, 0, 0, PM_REMOVE):
    TranslateMessage(self._msg)
    DispatchMessage(self._msg)
    if self._msg.message == WM_QUIT:
    logger.debug("WM_QUIT!")
    asyncio.get_event_loop().stop()
    while rc := PeekMessageW(self._msg, 0, 0, 0, PM_REMOVE):
    if not self.message_hook or not self.message_hook(self._msg):
    TranslateMessage(self._msg)
    DispatchMessageW(self._msg)
    if self._msg.message == WM_QUIT:
    logger.debug("WM_QUIT!")
    asyncio.get_event_loop().stop()
    elif rv == WAIT_TIMEOUT:
    logger.debug("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    elif rv == WAIT_FAILED:
  5. KubaO renamed this gist Sep 9, 2024. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  6. KubaO revised this gist Sep 9, 2024. 1 changed file with 66 additions and 151 deletions.
    217 changes: 66 additions & 151 deletions rttest.py
    Original file line number Diff line number Diff line change
    @@ -1,176 +1,98 @@
    import asyncio
    import asyncio.events
    import asyncio.windows_events
    import functools
    from ctypes import sizeof, byref, pointer

    from win32more import FAILED
    from win32more.Windows.UI.Popups import MessageDialog
    from win32more.Windows.Win32.Foundation import (
    HANDLE,
    WAIT_OBJECT_0,
    WAIT_IO_COMPLETION,
    WAIT_TIMEOUT,
    WAIT_FAILED,
    )
    from win32more.Windows.Win32.System.LibraryLoader import GetModuleHandle
    from win32more.Windows.Win32.System.WinRT import (
    RO_INIT_MULTITHREADED,
    RoInitialize,
    RoUninitialize,
    )
    from win32more.Windows.Win32.UI.Input.KeyboardAndMouse import SetActiveWindow
    from win32more.Windows.Win32.UI.Shell import IInitializeWithWindow
    from win32more.Windows.Win32.UI.WindowsAndMessaging import (
    CW_USEDEFAULT,
    MSG,
    MWMO_ALERTABLE,
    MWMO_INPUTAVAILABLE,
    PM_REMOVE,
    QS_ALLINPUT,
    WNDCLASS,
    WS_OVERLAPPED,
    WM_QUIT,
    CreateWindowEx,
    DefWindowProc,
    DispatchMessage,
    GetMessage,
    MsgWaitForMultipleObjectsEx,
    PeekMessage,
    PostQuitMessage,
    RegisterClass,
    TranslateMessage,
    )

    from win32more.Windows.Win32.UI.HiDpi import DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2, SetProcessDpiAwarenessContext


    def create_owner_window():
    CLASS_NAME = "Owner Window"
    hInstance = GetModuleHandle(None)

    wc = WNDCLASS()
    wc.cbSize = sizeof(WNDCLASS)
    wc.lpfnWndProc = DefWindowProc
    wc.hInstance = hInstance
    wc.lpszClassName = CLASS_NAME

    atom = RegisterClass(wc)
    if not atom:
    raise WinError()

    hwnd = CreateWindowEx(
    0,
    CLASS_NAME,
    "",
    WS_OVERLAPPED,
    CW_USEDEFAULT,
    CW_USEDEFAULT,
    CW_USEDEFAULT,
    CW_USEDEFAULT,
    0,
    0,
    hInstance,
    0)
    if not hwnd:
    raise WinError()

    # workaround to avoid that dialog window appear in background.
    SetActiveWindow(hwnd)

    return hwnd
    """An event loop policy that integrates Windows message loop with IOCP-based async processing."""

    # SPDX-License-Identifier: MIT

    async def winrt_messagedialog(owner_window):
    dialog = MessageDialog(
    "This is WinRT MessageDialog",
    "WinRT MessageDialog")
    dialog.as_(IInitializeWithWindow).Initialize(owner_window)
    uicommand = await dialog.ShowAsync()
    print(uicommand, uicommand.Label)


    def initialize():
    r = SetProcessDpiAwarenessContext(
    DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2)
    if FAILED(r):
    raise WinError(r)

    hr = RoInitialize(RO_INIT_MULTITHREADED)
    if FAILED(hr):
    raise WinError(hr)


    def shutdown():
    RoUninitialize()

    __all__ = (
    'MsgProactorEventLoopPolicy',
    'MsgIocpProactor',
    )

    async def main():
    task = asyncio.create_task(winrt_messagedialog(create_owner_window()))
    task.add_done_callback(lambda _: PostQuitMessage(0))
    await task
    import asyncio
    import asyncio.events
    import ctypes
    import logging
    import math
    import sys
    import traceback
    from _winapi import INFINITE, CloseHandle
    from ctypes.wintypes import HANDLE, MSG
    from _overlapped import INVALID_HANDLE_VALUE, GetQueuedCompletionStatus

    WAIT_OBJECT_0 = 0
    WAIT_IO_COMPLETION = 192
    WAIT_TIMEOUT = 258
    WAIT_FAILED = 4294967295
    MWMO_ALERTABLE = 2
    MWMO_INPUTAVAILABLE = 4
    PM_REMOVE = 1
    QS_ALLINPUT = 1279
    WS_OVERLAPPED = 0
    WM_QUIT = 18

    _user32 = ctypes.WinDLL("USER32")
    PeekMessage = _user32.PeekMessageW
    TranslateMessage = _user32.TranslateMessage
    DispatchMessage = _user32.DispatchMessageW
    MsgWaitForMultipleObjectsEx = _user32.MsgWaitForMultipleObjectsEx

    logger = logging.getLogger(__name__)


    def debug_stack(limit=None):
    stack = traceback.extract_stack(limit=limit)
    for item in traceback.StackSummary.from_list(stack).format():
    logger.debug(item)


    class MsgIocpProactor(asyncio.IocpProactor):
    import math
    import traceback
    import _overlapped
    import _winapi
    INFINITE = _winapi.INFINITE
    GetQueuedCompletionStatus = _overlapped.GetQueuedCompletionStatus

    def __init__(self, *args, **kwargs):
    self.__msg = MSG()
    self.__handles = (HANDLE * 1)()
    self.__dumpstack = True
    self._msg = MSG()
    self._handles = (HANDLE * 1)()
    self._dumpstack = logger.getEffectiveLevel() <= logging.DEBUG
    super().__init__(*args, **kwargs)

    def _poll(self, timeout=None):
    print("_poll enter")
    logger.debug("_poll enter")

    if self.__dumpstack:
    MsgIocpProactor.traceback.print_stack(limit=6)
    self.__dumpstack = False
    if self._dumpstack:
    debug_stack(limit=6)
    self._dumpstack = False

    if timeout is None:
    ms = MsgIocpProactor.INFINITE
    ms = INFINITE
    elif timeout < 0:
    raise ValueError("negative timeout")
    else:
    # MsgWaitForMultipleObjectsEx() has a resolution of 1 millisecond,
    # round away from zero to wait *at least* timeout seconds.
    ms = MsgIocpProactor.math.ceil(timeout * 1e3)
    if ms >= MsgIocpProactor.INFINITE:
    ms = math.ceil(timeout * 1e3)
    if ms >= INFINITE:
    raise ValueError("timeout too big")

    self.__handles[0] = self._iocp
    self._handles[0] = self._iocp
    while True:
    # START
    status = None
    rv = MsgWaitForMultipleObjectsEx(
    1, self.__handles, ms, QS_ALLINPUT, MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    1, self._handles, ms, QS_ALLINPUT, MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    if rv == WAIT_OBJECT_0 or rv == WAIT_IO_COMPLETION:
    print(f"MsgWaitForMultipleObjectsEx IOC {rv}")
    status = MsgIocpProactor.GetQueuedCompletionStatus(
    self._iocp, ms)
    logger.debug(f"MsgWaitForMultipleObjectsEx IOC {rv}")
    status = GetQueuedCompletionStatus(self._iocp, ms)
    elif rv == WAIT_OBJECT_0 + 1:
    while rc := PeekMessage(self.__msg, 0, 0, 0, PM_REMOVE):
    TranslateMessage(self.__msg)
    DispatchMessage(self.__msg)
    if self.__msg.message == WM_QUIT:
    print("WM_QUIT!")
    while rc := PeekMessage(self._msg, 0, 0, 0, PM_REMOVE):
    TranslateMessage(self._msg)
    DispatchMessage(self._msg)
    if self._msg.message == WM_QUIT:
    logger.debug("WM_QUIT!")
    asyncio.get_event_loop().stop()
    elif rv == WAIT_TIMEOUT:
    print("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    logger.debug("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    elif rv == WAIT_FAILED:
    print("MsgWaitForMultipleObjectsEx WAIT_FAILED")
    logger.debug("MsgWaitForMultipleObjectsEx WAIT_FAILED")
    else:
    print(f"MsgWaitForMultipleObjectsEx returned {rv}")
    # END
    logger.debug(f"MsgWaitForMultipleObjectsEx returned {rv}")

    if status is None:
    break
    ms = 0

    err, transferred, key, address = status
    try:
    @@ -186,8 +108,8 @@ def _poll(self, timeout=None):

    # key is either zero, or it is used to return a pipe
    # handle which should be closed to avoid a leak.
    if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
    _winapi.CloseHandle(key)
    if key not in (0, INVALID_HANDLE_VALUE):
    CloseHandle(key)
    continue

    if obj in self._stopped_serving:
    @@ -211,7 +133,7 @@ def _poll(self, timeout=None):
    self._cache.pop(ov.address, None)
    self._unregistered.clear()

    print("_poll exit")
    logger.debug("_poll exit")


    class MsgProactorEventLoop(asyncio.ProactorEventLoop):
    @@ -225,10 +147,3 @@ class MsgProactorEventLoopPolicy(
    asyncio.events.BaseDefaultEventLoopPolicy):
    _loop_factory = MsgProactorEventLoop


    if __name__ == "__main__":
    asyncio.set_event_loop_policy(MsgProactorEventLoopPolicy())
    initialize()
    asyncio.run(main())
    shutdown()

  7. KubaO revised this gist Sep 9, 2024. 1 changed file with 31 additions and 21 deletions.
    52 changes: 31 additions & 21 deletions rttest.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,6 @@
    import asyncio
    import asyncio.events
    import asyncio.windows_events
    import functools
    from ctypes import sizeof, byref, pointer

    @@ -104,40 +106,41 @@ def shutdown():


    async def main():
    wrap_poll()
    task = asyncio.create_task(winrt_messagedialog(create_owner_window()))
    task.add_done_callback(lambda _: PostQuitMessage(0))
    await task


    def wrap_poll(proactor=None):
    import asyncio.windows_events
    class MsgIocpProactor(asyncio.IocpProactor):
    import math
    import traceback
    import _overlapped
    import _winapi
    INFINITE = _winapi.INFINITE
    GetQueuedCompletionStatus = _overlapped.GetQueuedCompletionStatus

    if not proactor:
    event_loop = asyncio.get_event_loop()
    print(event_loop)
    proactor = event_loop._proactor
    def __init__(self, *args, **kwargs):
    self.__msg = MSG()
    self.__handles = (HANDLE * 1)()
    self.__dumpstack = True
    super().__init__(*args, **kwargs)

    def _poll_msg(self, timeout=None):
    print("_poll_msg enter")
    def _poll(self, timeout=None):
    print("_poll enter")

    if self.__dumpstack:
    traceback.print_stack(limit=6)
    MsgIocpProactor.traceback.print_stack(limit=6)
    self.__dumpstack = False

    if timeout is None:
    ms = _winapi.INFINITE
    ms = MsgIocpProactor.INFINITE
    elif timeout < 0:
    raise ValueError("negative timeout")
    else:
    # MsgWaitForMultipleObjectsEx() has a resolution of 1 millisecond,
    # round away from zero to wait *at least* timeout seconds.
    ms = math.ceil(timeout * 1e3)
    if ms >= _winapi.INFINITE:
    ms = MsgIocpProactor.math.ceil(timeout * 1e3)
    if ms >= MsgIocpProactor.INFINITE:
    raise ValueError("timeout too big")

    self.__handles[0] = self._iocp
    @@ -148,7 +151,8 @@ def _poll_msg(self, timeout=None):
    1, self.__handles, ms, QS_ALLINPUT, MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    if rv == WAIT_OBJECT_0 or rv == WAIT_IO_COMPLETION:
    print(f"MsgWaitForMultipleObjectsEx IOC {rv}")
    status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
    status = MsgIocpProactor.GetQueuedCompletionStatus(
    self._iocp, ms)
    elif rv == WAIT_OBJECT_0 + 1:
    while rc := PeekMessage(self.__msg, 0, 0, 0, PM_REMOVE):
    TranslateMessage(self.__msg)
    @@ -207,17 +211,23 @@ def _poll_msg(self, timeout=None):
    self._cache.pop(ov.address, None)
    self._unregistered.clear()

    print("_poll_msg exit")
    print("_poll exit")

    print(proactor)
    if isinstance(proactor, asyncio.windows_events.IocpProactor):
    proactor._poll = functools.partial(_poll_msg, proactor)
    proactor.__msg = MSG()
    proactor.__handles = (HANDLE * 1)()
    proactor.__dumpstack = True

    class MsgProactorEventLoop(asyncio.ProactorEventLoop):
    def __init__(self, proactor=None):
    if proactor is None:
    proactor = MsgIocpProactor()
    super().__init__(proactor)


    class MsgProactorEventLoopPolicy(
    asyncio.events.BaseDefaultEventLoopPolicy):
    _loop_factory = MsgProactorEventLoop


    if __name__ == "__main__":
    asyncio.set_event_loop_policy(MsgProactorEventLoopPolicy())
    initialize()
    asyncio.run(main())
    shutdown()
  8. KubaO revised this gist Sep 9, 2024. 1 changed file with 9 additions and 11 deletions.
    20 changes: 9 additions & 11 deletions rttest.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,4 @@
    import asyncio
    import asyncio.windows_events
    import functools
    from ctypes import sizeof, byref, pointer

    @@ -105,26 +104,24 @@ def shutdown():


    async def main():
    print(asyncio.events.get_running_loop())
    instrument_asyncio()

    wrap_poll()
    task = asyncio.create_task(winrt_messagedialog(create_owner_window()))
    task.add_done_callback(lambda _: PostQuitMessage(0))
    await task


    def instrument_asyncio():
    event_loop = asyncio.get_event_loop()
    proactor = event_loop._proactor
    wrap_poll(proactor)


    def wrap_poll(proactor):
    def wrap_poll(proactor=None):
    import asyncio.windows_events
    import math
    import traceback
    import _overlapped
    import _winapi

    if not proactor:
    event_loop = asyncio.get_event_loop()
    print(event_loop)
    proactor = event_loop._proactor

    def _poll_msg(self, timeout=None):
    print("_poll_msg enter")

    @@ -224,3 +221,4 @@ def _poll_msg(self, timeout=None):
    initialize()
    asyncio.run(main())
    shutdown()

  9. KubaO revised this gist Sep 9, 2024. 1 changed file with 99 additions and 78 deletions.
    177 changes: 99 additions & 78 deletions rttest.py
    Original file line number Diff line number Diff line change
    @@ -1,18 +1,9 @@
    import asyncio
    import asyncio.windows_events
    import _overlapped
    import functools
    from ctypes import sizeof, byref, pointer

    from win32more.mddbootstrap import (
    WINDOWSAPPSDK_RELEASE_MAJORMINOR,
    WINDOWSAPPSDK_RELEASE_VERSION_SHORTTAG_W,
    WINDOWSAPPSDK_RUNTIME_VERSION_UINT64,
    MddBootstrapInitialize2,
    MddBootstrapInitializeOptions_OnNoMatch_ShowUI,
    MddBootstrapShutdown,
    )

    from win32more import FAILED, WinError, UInt32
    from win32more import FAILED
    from win32more.Windows.UI.Popups import MessageDialog
    from win32more.Windows.Win32.Foundation import (
    HANDLE,
    @@ -21,7 +12,6 @@
    WAIT_TIMEOUT,
    WAIT_FAILED,
    )
    from win32more.Windows.Win32.Storage.Packaging.Appx import PACKAGE_VERSION
    from win32more.Windows.Win32.System.LibraryLoader import GetModuleHandle
    from win32more.Windows.Win32.System.WinRT import (
    RO_INIT_MULTITHREADED,
    @@ -33,23 +23,22 @@
    from win32more.Windows.Win32.UI.WindowsAndMessaging import (
    CW_USEDEFAULT,
    MSG,
    WNDCLASS,
    WS_OVERLAPPED,
    QS_ALLINPUT,
    MWMO_ALERTABLE,
    MWMO_INPUTAVAILABLE,
    PM_REMOVE,
    QS_ALLINPUT,
    WNDCLASS,
    WS_OVERLAPPED,
    WM_QUIT,
    CreateWindowEx,
    DefWindowProc,
    DispatchMessage,
    GetMessage,
    MsgWaitForMultipleObjectsEx,
    PeekMessage,
    PostQuitMessage,
    RegisterClass,
    SetTimer,
    TranslateMessage,
    MsgWaitForMultipleObjectsEx,
    )

    from win32more.Windows.Win32.UI.HiDpi import DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2, SetProcessDpiAwarenessContext
    @@ -127,79 +116,111 @@ async def main():
    def instrument_asyncio():
    event_loop = asyncio.get_event_loop()
    proactor = event_loop._proactor
    print(proactor)
    wrap_poll(proactor)
    wrap_overlapped()


    def wrap_poll(proactor):
    import math
    import traceback
    _poll = proactor._poll
    print(_poll)
    import _overlapped
    import _winapi

    def _poll_msg(self, timeout=None):
    print("_poll_msg enter")

    def _poll_wrapper(timeout=None):
    print("_poll enter")
    if wrap_poll.dumpstack:
    if self.__dumpstack:
    traceback.print_stack(limit=6)
    wrap_poll.dumpstack = False
    _poll(timeout)
    print("_poll exit")
    proactor._poll = _poll_wrapper


    wrap_poll.dumpstack = True


    def wrap_overlapped():
    print(_overlapped.GetQueuedCompletionStatus)
    _GetQueuedCompletionStatus = _overlapped.GetQueuedCompletionStatus

    def GetQueuedCompletionStatus_wrapper(port: HANDLE, msecs: UInt32):
    print(f"GetQueuedCompletionStatus entered for {msecs} ms")

    msg = None
    result = None
    nCount = 1
    handles = (HANDLE * nCount)()
    handles[0] = port

    rv = MsgWaitForMultipleObjectsEx(
    nCount,
    handles,
    msecs,
    QS_ALLINPUT,
    MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    if rv == WAIT_OBJECT_0 or rv == WAIT_IO_COMPLETION:
    print(f"MsgWait... {rv}")
    result = _GetQueuedCompletionStatus(port, msecs)
    elif rv == WAIT_OBJECT_0 + nCount:
    msg = MSG()
    hadmessages = False
    while PeekMessage(msg, 0, 0, 0, PM_REMOVE):
    hadmessages = True
    TranslateMessage(msg)
    DispatchMessage(msg)
    if msg.message == WM_QUIT:
    print("WM_QUIT!")
    asyncio.get_event_loop().stop()
    print("STOPPED")
    break
    print(hadmessages)
    elif rv == WAIT_TIMEOUT:
    print("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    elif rv == WAIT_FAILED:
    print("MsgWaitForMultipleObjectsEx WAIT_FAILED")
    else:
    print(f"MsgWaitForMultipleObjectsEx returned {rv}")
    self.__dumpstack = False

    print("GetQueuedCompletionStatus exit")
    return result
    if timeout is None:
    ms = _winapi.INFINITE
    elif timeout < 0:
    raise ValueError("negative timeout")
    else:
    # MsgWaitForMultipleObjectsEx() has a resolution of 1 millisecond,
    # round away from zero to wait *at least* timeout seconds.
    ms = math.ceil(timeout * 1e3)
    if ms >= _winapi.INFINITE:
    raise ValueError("timeout too big")

    self.__handles[0] = self._iocp
    while True:
    # START
    status = None
    rv = MsgWaitForMultipleObjectsEx(
    1, self.__handles, ms, QS_ALLINPUT, MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    if rv == WAIT_OBJECT_0 or rv == WAIT_IO_COMPLETION:
    print(f"MsgWaitForMultipleObjectsEx IOC {rv}")
    status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
    elif rv == WAIT_OBJECT_0 + 1:
    while rc := PeekMessage(self.__msg, 0, 0, 0, PM_REMOVE):
    TranslateMessage(self.__msg)
    DispatchMessage(self.__msg)
    if self.__msg.message == WM_QUIT:
    print("WM_QUIT!")
    asyncio.get_event_loop().stop()
    elif rv == WAIT_TIMEOUT:
    print("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    elif rv == WAIT_FAILED:
    print("MsgWaitForMultipleObjectsEx WAIT_FAILED")
    else:
    print(f"MsgWaitForMultipleObjectsEx returned {rv}")
    # END

    if status is None:
    break
    ms = 0

    err, transferred, key, address = status
    try:
    f, ov, obj, callback = self._cache.pop(address)
    except KeyError:
    if self._loop.get_debug():
    self._loop.call_exception_handler({
    'message': ('GetQueuedCompletionStatus() returned an '
    'unexpected event'),
    'status': ('err=%s transferred=%s key=%#x address=%#x'
    % (err, transferred, key, address)),
    })

    # key is either zero, or it is used to return a pipe
    # handle which should be closed to avoid a leak.
    if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
    _winapi.CloseHandle(key)
    continue

    if obj in self._stopped_serving:
    f.cancel()
    # Don't call the callback if _register() already read the result or
    # if the overlapped has been cancelled
    elif not f.done():
    try:
    value = callback(transferred, key, ov)
    except OSError as e:
    f.set_exception(e)
    self._results.append(f)
    else:
    f.set_result(value)
    self._results.append(f)
    finally:
    f = None

    # Remove unregistered futures
    for ov in self._unregistered:
    self._cache.pop(ov.address, None)
    self._unregistered.clear()

    print("_poll_msg exit")

    _overlapped.GetQueuedCompletionStatus = GetQueuedCompletionStatus_wrapper
    print(proactor)
    if isinstance(proactor, asyncio.windows_events.IocpProactor):
    proactor._poll = functools.partial(_poll_msg, proactor)
    proactor.__msg = MSG()
    proactor.__handles = (HANDLE * 1)()
    proactor.__dumpstack = True


    if __name__ == "__main__":
    initialize()
    asyncio.run(main())
    shutdown()

  10. KubaO created this gist Sep 9, 2024.
    205 changes: 205 additions & 0 deletions rttest.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,205 @@
    import asyncio
    import asyncio.windows_events
    import _overlapped
    from ctypes import sizeof, byref, pointer

    from win32more.mddbootstrap import (
    WINDOWSAPPSDK_RELEASE_MAJORMINOR,
    WINDOWSAPPSDK_RELEASE_VERSION_SHORTTAG_W,
    WINDOWSAPPSDK_RUNTIME_VERSION_UINT64,
    MddBootstrapInitialize2,
    MddBootstrapInitializeOptions_OnNoMatch_ShowUI,
    MddBootstrapShutdown,
    )

    from win32more import FAILED, WinError, UInt32
    from win32more.Windows.UI.Popups import MessageDialog
    from win32more.Windows.Win32.Foundation import (
    HANDLE,
    WAIT_OBJECT_0,
    WAIT_IO_COMPLETION,
    WAIT_TIMEOUT,
    WAIT_FAILED,
    )
    from win32more.Windows.Win32.Storage.Packaging.Appx import PACKAGE_VERSION
    from win32more.Windows.Win32.System.LibraryLoader import GetModuleHandle
    from win32more.Windows.Win32.System.WinRT import (
    RO_INIT_MULTITHREADED,
    RoInitialize,
    RoUninitialize,
    )
    from win32more.Windows.Win32.UI.Input.KeyboardAndMouse import SetActiveWindow
    from win32more.Windows.Win32.UI.Shell import IInitializeWithWindow
    from win32more.Windows.Win32.UI.WindowsAndMessaging import (
    CW_USEDEFAULT,
    MSG,
    WNDCLASS,
    WS_OVERLAPPED,
    QS_ALLINPUT,
    MWMO_ALERTABLE,
    MWMO_INPUTAVAILABLE,
    PM_REMOVE,
    WM_QUIT,
    CreateWindowEx,
    DefWindowProc,
    DispatchMessage,
    GetMessage,
    PeekMessage,
    PostQuitMessage,
    RegisterClass,
    SetTimer,
    TranslateMessage,
    MsgWaitForMultipleObjectsEx,
    )

    from win32more.Windows.Win32.UI.HiDpi import DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2, SetProcessDpiAwarenessContext


    def create_owner_window():
    CLASS_NAME = "Owner Window"
    hInstance = GetModuleHandle(None)

    wc = WNDCLASS()
    wc.cbSize = sizeof(WNDCLASS)
    wc.lpfnWndProc = DefWindowProc
    wc.hInstance = hInstance
    wc.lpszClassName = CLASS_NAME

    atom = RegisterClass(wc)
    if not atom:
    raise WinError()

    hwnd = CreateWindowEx(
    0,
    CLASS_NAME,
    "",
    WS_OVERLAPPED,
    CW_USEDEFAULT,
    CW_USEDEFAULT,
    CW_USEDEFAULT,
    CW_USEDEFAULT,
    0,
    0,
    hInstance,
    0)
    if not hwnd:
    raise WinError()

    # workaround to avoid that dialog window appear in background.
    SetActiveWindow(hwnd)

    return hwnd


    async def winrt_messagedialog(owner_window):
    dialog = MessageDialog(
    "This is WinRT MessageDialog",
    "WinRT MessageDialog")
    dialog.as_(IInitializeWithWindow).Initialize(owner_window)
    uicommand = await dialog.ShowAsync()
    print(uicommand, uicommand.Label)


    def initialize():
    r = SetProcessDpiAwarenessContext(
    DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2)
    if FAILED(r):
    raise WinError(r)

    hr = RoInitialize(RO_INIT_MULTITHREADED)
    if FAILED(hr):
    raise WinError(hr)


    def shutdown():
    RoUninitialize()


    async def main():
    print(asyncio.events.get_running_loop())
    instrument_asyncio()

    task = asyncio.create_task(winrt_messagedialog(create_owner_window()))
    task.add_done_callback(lambda _: PostQuitMessage(0))
    await task


    def instrument_asyncio():
    event_loop = asyncio.get_event_loop()
    proactor = event_loop._proactor
    print(proactor)
    wrap_poll(proactor)
    wrap_overlapped()


    def wrap_poll(proactor):
    import traceback
    _poll = proactor._poll
    print(_poll)

    def _poll_wrapper(timeout=None):
    print("_poll enter")
    if wrap_poll.dumpstack:
    traceback.print_stack(limit=6)
    wrap_poll.dumpstack = False
    _poll(timeout)
    print("_poll exit")
    proactor._poll = _poll_wrapper


    wrap_poll.dumpstack = True


    def wrap_overlapped():
    print(_overlapped.GetQueuedCompletionStatus)
    _GetQueuedCompletionStatus = _overlapped.GetQueuedCompletionStatus

    def GetQueuedCompletionStatus_wrapper(port: HANDLE, msecs: UInt32):
    print(f"GetQueuedCompletionStatus entered for {msecs} ms")

    msg = None
    result = None
    nCount = 1
    handles = (HANDLE * nCount)()
    handles[0] = port

    rv = MsgWaitForMultipleObjectsEx(
    nCount,
    handles,
    msecs,
    QS_ALLINPUT,
    MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
    if rv == WAIT_OBJECT_0 or rv == WAIT_IO_COMPLETION:
    print(f"MsgWait... {rv}")
    result = _GetQueuedCompletionStatus(port, msecs)
    elif rv == WAIT_OBJECT_0 + nCount:
    msg = MSG()
    hadmessages = False
    while PeekMessage(msg, 0, 0, 0, PM_REMOVE):
    hadmessages = True
    TranslateMessage(msg)
    DispatchMessage(msg)
    if msg.message == WM_QUIT:
    print("WM_QUIT!")
    asyncio.get_event_loop().stop()
    print("STOPPED")
    break
    print(hadmessages)
    elif rv == WAIT_TIMEOUT:
    print("MsgWaitForMultipleObjectsEx WAIT_TIMEOUT")
    elif rv == WAIT_FAILED:
    print("MsgWaitForMultipleObjectsEx WAIT_FAILED")
    else:
    print(f"MsgWaitForMultipleObjectsEx returned {rv}")

    print("GetQueuedCompletionStatus exit")
    return result

    _overlapped.GetQueuedCompletionStatus = GetQueuedCompletionStatus_wrapper


    if __name__ == "__main__":
    initialize()
    asyncio.run(main())
    shutdown()