Skip to content

Instantly share code, notes, and snippets.

@B3QL
Forked from claws/README.rst
Created July 26, 2019 20:43
Show Gist options
  • Select an option

  • Save B3QL/35ecbaac8ab3fdfc00ca2d348e7b5cf7 to your computer and use it in GitHub Desktop.

Select an option

Save B3QL/35ecbaac8ab3fdfc00ca2d348e7b5cf7 to your computer and use it in GitHub Desktop.

Revisions

  1. @claws claws revised this gist Jul 14, 2015. 1 changed file with 14 additions and 1 deletion.
    15 changes: 14 additions & 1 deletion README.rst
    Original file line number Diff line number Diff line change
    @@ -25,4 +25,17 @@ The demo script, ``demo.py`` attempts to create two UDP sockets that both bind t

    Run the demo using the standard event loop:

    .. code-block:
    .. code-block:: python3
    $ python3 demo.py
    When run on Linux this is expected to complete but on OSX it is expected to fail with a ``OSError: [Errno 48] Address already in use``.


    Run the demo using the workaround event loop patched in:

    .. code-block:: python3
    $ python3 demo.py --workaround
    When run on Linux and OSX this is expected to complete.
  2. @claws claws revised this gist Jul 14, 2015. 2 changed files with 12 additions and 17 deletions.
    15 changes: 1 addition & 14 deletions README.rst
    Original file line number Diff line number Diff line change
    @@ -25,17 +25,4 @@ The demo script, ``demo.py`` attempts to create two UDP sockets that both bind t

    Run the demo using the standard event loop:

    .. code-block:: python3
    $ python3 demo.py
    When run on Linux this is expected to complete but on OSX it is expected to fail with a ``OSError: [Errno 48] Address already in use``.


    Run the demo using the workaround event loop patched in:

    .. code-block:: python3
    $ python3 demo.py --workaround
    When run on Linux and OSX this is expected to complete.
    .. code-block:
    14 changes: 11 additions & 3 deletions patch.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@

    '''
    This module attempts to implement the solution suggested in:
    @@ -143,7 +142,16 @@ def create_datagram_endpoint(self, protocol_factory,
    return transport, protocol


    class PatchedEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    _loop_factory = PatchedLoop

    # Now patch asyncio so we can create UDP sockets that are capable of binding
    # to the same port OSX.
    # to the same port on OSX.
    asyncio.SelectorEventLoop = PatchedLoop
    asyncio.DefaultEventLoopPolicy._loop_factory = PatchedLoop

    # Explicitly set the event loop policy to ensure the patched event
    # loop is used. This patch file should really be imported before any
    # calls to asyncio.get_event_loop().
    #
    asyncio.DefaultEventLoopPolicy = PatchedEventLoopPolicy
    asyncio.set_event_loop_policy(PatchedEventLoopPolicy())
  3. @claws claws revised this gist Jul 13, 2015. 3 changed files with 124 additions and 74 deletions.
    1 change: 0 additions & 1 deletion README.rst
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@

    Python asyncio SO_REUSEPORT OSX issue
    =====================================

    14 changes: 11 additions & 3 deletions demo.py
    Original file line number Diff line number Diff line change
    @@ -98,8 +98,12 @@ def demo(loop, args):
    reuse_address=True, reuse_port=True,
    allow_broadcast=True)
    else:
    _transport2, _p2 = yield from loop.create_datagram_endpoint(
    lambda: prot2, local_addr=(args.address, args.port))
    try:
    _transport2, _p2 = yield from loop.create_datagram_endpoint(
    lambda: prot2, local_addr=(args.address, args.port))
    except Exception as exc:
    _transport1.close()
    raise exc from exc

    yield from asyncio.wait([prot1.wait_ready, prot2.wait_ready])

    @@ -154,4 +158,8 @@ def demo(loop, args):

    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, loop.stop)
    loop.run_until_complete(demo(loop, args))

    try:
    loop.run_until_complete(demo(loop, args))
    finally:
    loop.close()
    183 changes: 113 additions & 70 deletions patch.py
    Original file line number Diff line number Diff line change
    @@ -9,94 +9,137 @@
    import asyncio
    import collections
    import socket
    import sys

    from asyncio import futures
    from asyncio.log import logger

    version = sys.version_info
    PY_3_5_plus = version > (3, 5, )


    class PatchedLoop(asyncio.SelectorEventLoop):
    '''
    The create_datagram_endpoint method below is a modified version of the
    original in asyncio/base_events.py. This version can set the SO_REUSEPORT
    socket option prior to the call to bind. This capability is controlled
    by some new keyword arguments (e.g. reuse_address, reuse_port, and
    allow_broadcast).
    original in Python 3.6.0.a0 asyncio/base_events.py. However my local
    version of Python is 3.4.1 so the later (3.5+) additions are wrapped in
    a PY_3_5_plus check.
    This version can set the SO_REUSEPORT socket option prior to the call to
    bind. This capability is controlled by some new keyword arguments (e.g.
    reuse_address, reuse_port, and allow_broadcast).
    '''

    @asyncio.coroutine
    def create_datagram_endpoint(self, protocol_factory,
    local_addr=None, remote_addr=None, *,
    family=0, proto=0, flags=0,
    reuse_address=None, reuse_port=None,
    allow_broadcast=None):
    allow_broadcast=None, sock=None):
    """Create datagram connection."""
    if not (local_addr or remote_addr):
    if family == 0:
    raise ValueError('unexpected address family')
    addr_pairs_info = (((family, proto), (None, None)),)
    else:
    # join addresss by (family, protocol)
    addr_infos = collections.OrderedDict()
    for idx, addr in ((0, local_addr), (1, remote_addr)):
    if addr is not None:
    assert isinstance(addr, tuple) and len(addr) == 2, (
    '2-tuple is expected')

    infos = yield from self.getaddrinfo(
    *addr, family=family, type=socket.SOCK_DGRAM,
    proto=proto, flags=flags)
    if not infos:
    raise OSError('getaddrinfo() returned empty list')

    for fam, _, pro, _, address in infos:
    key = (fam, pro)
    if key not in addr_infos:
    addr_infos[key] = [None, None]
    addr_infos[key][idx] = address

    # each addr has to have info for each (family, proto) pair
    addr_pairs_info = [
    (key, addr_pair) for key, addr_pair in addr_infos.items()
    if not ((local_addr and addr_pair[0] is None) or
    (remote_addr and addr_pair[1] is None))]

    if not addr_pairs_info:
    raise ValueError('can not get address information')

    exceptions = []

    for ((family, proto),
    (local_address, remote_address)) in addr_pairs_info:
    sock = None
    r_addr = None
    try:
    sock = socket.socket(
    family=family, type=socket.SOCK_DGRAM, proto=proto)
    if reuse_address:
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    if reuse_port:
    if 'SO_REUSEPORT' in vars(socket):
    if sock is None:
    if not (local_addr or remote_addr):
    if family == 0:
    raise ValueError('unexpected address family')
    addr_pairs_info = (((family, proto), (None, None)),)
    else:
    # join address by (family, protocol)
    addr_infos = collections.OrderedDict()
    for idx, addr in ((0, local_addr), (1, remote_addr)):
    if addr is not None:
    assert isinstance(addr, tuple) and len(addr) == 2, (
    '2-tuple is expected')

    infos = yield from self.getaddrinfo(
    *addr, family=family, type=socket.SOCK_DGRAM,
    proto=proto, flags=flags)
    if not infos:
    raise OSError('getaddrinfo() returned empty list')

    for fam, _, pro, _, address in infos:
    key = (fam, pro)
    if key not in addr_infos:
    addr_infos[key] = [None, None]
    addr_infos[key][idx] = address

    # each addr has to have info for each (family, proto) pair
    addr_pairs_info = [
    (key, addr_pair) for key, addr_pair in addr_infos.items()
    if not ((local_addr and addr_pair[0] is None) or
    (remote_addr and addr_pair[1] is None))]

    if not addr_pairs_info:
    raise ValueError('can not get address information')

    exceptions = []

    for ((family, proto),
    (local_address, remote_address)) in addr_pairs_info:
    sock = None
    r_addr = None
    try:
    sock = socket.socket(
    family=family, type=socket.SOCK_DGRAM, proto=proto)
    if reuse_address:
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    if reuse_port:
    if 'SO_REUSEPORT' in vars(socket):
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    if allow_broadcast:
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
    if allow_broadcast:
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_BROADCAST, True)
    sock.setblocking(False)

    if local_addr:
    sock.bind(local_address)
    if remote_addr:
    yield from self.sock_connect(sock, remote_address)
    r_addr = remote_address
    except OSError as exc:
    if sock is not None:
    sock.close()
    exceptions.append(exc)
    socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.setblocking(False)

    if local_addr:
    sock.bind(local_address)
    if remote_addr:
    yield from self.sock_connect(sock, remote_address)
    r_addr = remote_address
    except OSError as exc:
    if sock is not None:
    sock.close()
    exceptions.append(exc)
    except:
    if sock is not None:
    sock.close()
    raise
    else:
    break
    else:
    break
    raise exceptions[0]
    else:
    raise exceptions[0]
    if local_addr or remote_addr:
    raise ValueError(
    'local_addr/remote_addr and sock can not be specified '
    'at the same time')

    protocol = protocol_factory()
    transport = self._make_datagram_transport(sock, protocol, r_addr)
    if PY_3_5_plus:
    waiter = futures.Future(loop=self)
    transport = self._make_datagram_transport(sock, protocol, r_addr,
    waiter) # python 3.6
    else:
    transport = self._make_datagram_transport(sock, protocol, r_addr)

    if self._debug:
    if local_addr:
    logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
    "created: (%r, %r)",
    local_addr, remote_addr, transport, protocol)
    else:
    logger.debug("Datagram endpoint remote_addr=%r created: "
    "(%r, %r)",
    remote_addr, transport, protocol)

    if PY_3_5_plus:
    try:
    yield from waiter
    except:
    transport.close()
    raise

    return transport, protocol


  4. @claws claws created this gist Jul 13, 2015.
    42 changes: 42 additions & 0 deletions README.rst
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,42 @@

    Python asyncio SO_REUSEPORT OSX issue
    =====================================

    Overview
    --------

    My simple service discovery mechanism is built upon Python 3.4 and the ``asyncio`` module. It uses a UDP socket bound to a specified *discovery* port to allow applications to discover network services. System applications running on the same node bind to the same *discovery* port in order to receive discovery messages broadcast onto the network.

    It is not working on OSX. The current asyncio implementation does not provide a convenient developer access point for setting the `SO_REUSEPORT` socket option prior to the call to bind.

    The implementation of asyncio's ``create_datagram_endpoint`` method sets the ``SO_REUSEADDR`` socket option automatically prior to binding. On Linux this allows two (or more) UDP sockets to bind to exactly the same address and port combination. However, on OSX the ``SO_REUSEPORT`` socket option must also be set in order to provide the same functionality.

    The ``patch.py`` script in this gist attempts to implement the solution suggested in `issue 23972 <https://bugs.python.org/issue23972>`_ by overriding the event loop's ``create_datagram_endpoint`` method with a modified version. This modified method can set extra socket options based on some new method arguments. This implementation allows my simple service discovery mechanism to work on OSX too.

    This issue is mentioned `here <https://github.com/joyent/libuv/issues/769>`_ too.

    Also see `here <http://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t>`_ for a good overview of the socket options being used in this implementation.


    Demo
    ----

    The demo script, ``demo.py`` attempts to create two UDP sockets that both bind to the same port. When it is run on OSX with the standard event loop the demo is expected to fail when the second socket is being bound. When it is run on OSX with the patched event loop, the demo is expected to complete. Once the sockets are established they will both send a message to the broadcast address. When both sockets receive two messages (their own and the one from the other socket) they fire a wait_done Future which finishes the demo script.


    Run the demo using the standard event loop:

    .. code-block:: python3
    $ python3 demo.py
    When run on Linux this is expected to complete but on OSX it is expected to fail with a ``OSError: [Errno 48] Address already in use``.


    Run the demo using the workaround event loop patched in:

    .. code-block:: python3
    $ python3 demo.py --workaround
    When run on Linux and OSX this is expected to complete.
    157 changes: 157 additions & 0 deletions demo.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,157 @@
    '''
    This example tests the event loop patch implementation by attempting to create
    two UDP sockets that both bind to the same port.
    When run on OSX with the standard event loop the demo is expected to fail when
    the second socket is attempting to bind to the same address/port combination.
    When run on OSX with the patched event loop the demo is expected to complete.
    Once the sockets are established they will both send a message to the
    broadcast address. When both sockets receive two messages (their own and the
    one from the other socket) they fire their wait_done Future.
    Run the demo using the standard event loop:
    .. code-block:: python3
    python3 demo.py
    When run on Linux this is expected to complete but on OSX it is expected to
    fail with a OSError: [Errno 48] Address already in use.
    Run the demo using the workaround event loop patched in:
    .. code-block:: python3
    $ python3 demo.py --workaround
    When run on Linux and OSX this is expected to complete.
    '''

    import argparse
    import asyncio
    import signal
    import socket


    class ExampleProtocol(asyncio.DatagramProtocol):
    '''
    A trivial protocol that each endpoint will use to:
    - print debug information; and,
    - fire the wait_done Future indicating that the test is complete
    after it receives two messages (i.e. b"Hello" and b"World").
    '''

    def __init__(self, name):
    self.name = name
    self.wait_ready = asyncio.Future()
    self.wait_done = asyncio.Future()
    self.wait_closed = asyncio.Future()
    self.expected_msgs = set([b'Hello', b'World'])

    def connection_made(self, transport):
    print('{} connection_made'.format(self.name))
    self.transport = transport
    self.wait_ready.set_result(True)

    def connection_lost(self, exc):
    print('{} connection_lost{}'.format(
    self.name, ': {}'.format(exc) if exc else ''))
    self.wait_closed.set_result(True)

    def datagram_received(self, data, addr):
    print('{} datagram_received from {}: {}'.format(self.name, addr, data))
    self.expected_msgs.discard(data)
    if not self.expected_msgs:
    self.wait_done.set_result(True)

    def sendto(self, data, address):
    print('{} sending {} to {}'.format(self.name, data, address))
    self.transport.sendto(data, address)


    @asyncio.coroutine
    def demo(loop, args):

    prot1 = ExampleProtocol('prot1')
    if args.workaround:
    _transport1, _p1 = yield from loop.create_datagram_endpoint(
    lambda: prot1, local_addr=(args.address, args.port),
    reuse_address=True, reuse_port=True,
    allow_broadcast=True)
    else:
    _transport1, _p1 = yield from loop.create_datagram_endpoint(
    lambda: prot1, local_addr=(args.address, args.port))

    # Bind another socket to the same address/port combination. This action
    # is expected to trigger the issue when run on OSX platform using the
    # standard event loop. When the workaround is used this should succeed.
    prot2 = ExampleProtocol('prot2')
    if args.workaround:
    _transport2, _p2 = yield from loop.create_datagram_endpoint(
    lambda: prot2, local_addr=(args.address, args.port),
    reuse_address=True, reuse_port=True,
    allow_broadcast=True)
    else:
    _transport2, _p2 = yield from loop.create_datagram_endpoint(
    lambda: prot2, local_addr=(args.address, args.port))

    yield from asyncio.wait([prot1.wait_ready, prot2.wait_ready])

    if not args.workaround:
    # When using the standard (non-patched) loop the broadcast socket
    # option must be set manually.
    prot1.transport.get_extra_info('socket').setsockopt(
    socket.SOL_SOCKET, socket.SO_BROADCAST, True)
    prot2.transport.get_extra_info('socket').setsockopt(
    socket.SOL_SOCKET, socket.SO_BROADCAST, True)

    # Send a message to the broadcast address. All messages should be
    # received by each endpoint.
    target_addr = args.broadcast_addr
    prot1.sendto(b'Hello', (target_addr, args.port))
    prot2.sendto(b'World', (target_addr, args.port))

    yield from asyncio.wait([prot1.wait_done, prot2.wait_done])

    prot1.transport.close()
    prot2.transport.close()

    yield from asyncio.wait([prot1.wait_closed, prot2.wait_closed])

    print('done')


    ARGS = argparse.ArgumentParser('asyncio patch demo')
    ARGS.add_argument(
    '--workaround', action='store_true', default=False,
    help="Patch the loop before running the demo")
    ARGS.add_argument(
    '--address', type=str, default='0.0.0.0',
    help="The address to use for the demonstration")
    ARGS.add_argument(
    '--port', type=int, default=7000,
    help="The port to use for the demonstration")


    if __name__ == '__main__':

    ip_addr = socket.gethostbyname(socket.gethostname())
    # crude guess of the network broadcast address
    subnet_broadcast_addr = '{}.255'.format('.'.join(ip_addr.split('.')[:-1]))

    args = ARGS.parse_args()
    args.broadcast_addr = subnet_broadcast_addr

    if args.workaround:
    import patch
    (patch, ) # silence unused import warning

    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, loop.stop)
    loop.run_until_complete(demo(loop, args))
    106 changes: 106 additions & 0 deletions patch.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,106 @@

    '''
    This module attempts to implement the solution suggested in:
    https://bugs.python.org/issue23972
    '''

    import asyncio
    import collections
    import socket


    class PatchedLoop(asyncio.SelectorEventLoop):
    '''
    The create_datagram_endpoint method below is a modified version of the
    original in asyncio/base_events.py. This version can set the SO_REUSEPORT
    socket option prior to the call to bind. This capability is controlled
    by some new keyword arguments (e.g. reuse_address, reuse_port, and
    allow_broadcast).
    '''

    @asyncio.coroutine
    def create_datagram_endpoint(self, protocol_factory,
    local_addr=None, remote_addr=None, *,
    family=0, proto=0, flags=0,
    reuse_address=None, reuse_port=None,
    allow_broadcast=None):
    """Create datagram connection."""
    if not (local_addr or remote_addr):
    if family == 0:
    raise ValueError('unexpected address family')
    addr_pairs_info = (((family, proto), (None, None)),)
    else:
    # join addresss by (family, protocol)
    addr_infos = collections.OrderedDict()
    for idx, addr in ((0, local_addr), (1, remote_addr)):
    if addr is not None:
    assert isinstance(addr, tuple) and len(addr) == 2, (
    '2-tuple is expected')

    infos = yield from self.getaddrinfo(
    *addr, family=family, type=socket.SOCK_DGRAM,
    proto=proto, flags=flags)
    if not infos:
    raise OSError('getaddrinfo() returned empty list')

    for fam, _, pro, _, address in infos:
    key = (fam, pro)
    if key not in addr_infos:
    addr_infos[key] = [None, None]
    addr_infos[key][idx] = address

    # each addr has to have info for each (family, proto) pair
    addr_pairs_info = [
    (key, addr_pair) for key, addr_pair in addr_infos.items()
    if not ((local_addr and addr_pair[0] is None) or
    (remote_addr and addr_pair[1] is None))]

    if not addr_pairs_info:
    raise ValueError('can not get address information')

    exceptions = []

    for ((family, proto),
    (local_address, remote_address)) in addr_pairs_info:
    sock = None
    r_addr = None
    try:
    sock = socket.socket(
    family=family, type=socket.SOCK_DGRAM, proto=proto)
    if reuse_address:
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    if reuse_port:
    if 'SO_REUSEPORT' in vars(socket):
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
    if allow_broadcast:
    sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_BROADCAST, True)
    sock.setblocking(False)

    if local_addr:
    sock.bind(local_address)
    if remote_addr:
    yield from self.sock_connect(sock, remote_address)
    r_addr = remote_address
    except OSError as exc:
    if sock is not None:
    sock.close()
    exceptions.append(exc)
    else:
    break
    else:
    raise exceptions[0]

    protocol = protocol_factory()
    transport = self._make_datagram_transport(sock, protocol, r_addr)
    return transport, protocol


    # Now patch asyncio so we can create UDP sockets that are capable of binding
    # to the same port OSX.
    asyncio.SelectorEventLoop = PatchedLoop
    asyncio.DefaultEventLoopPolicy._loop_factory = PatchedLoop