Skip to content

Instantly share code, notes, and snippets.

@B3QL
Forked from claws/README.rst
Created July 26, 2019 20:43
Show Gist options
  • Select an option

  • Save B3QL/35ecbaac8ab3fdfc00ca2d348e7b5cf7 to your computer and use it in GitHub Desktop.

Select an option

Save B3QL/35ecbaac8ab3fdfc00ca2d348e7b5cf7 to your computer and use it in GitHub Desktop.
Python asyncio SO_REUSEPORT OSX issue

Python asyncio SO_REUSEPORT OSX issue

Overview

My simple service discovery mechanism is built upon Python 3.4 and the asyncio module. It uses a UDP socket bound to a specified discovery port to allow applications to discover network services. System applications running on the same node bind to the same discovery port in order to receive discovery messages broadcast onto the network.

It is not working on OSX. The current asyncio implementation does not provide a convenient developer access point for setting the SO_REUSEPORT socket option prior to the call to bind.

The implementation of asyncio's create_datagram_endpoint method sets the SO_REUSEADDR socket option automatically prior to binding. On Linux this allows two (or more) UDP sockets to bind to exactly the same address and port combination. However, on OSX the SO_REUSEPORT socket option must also be set in order to provide the same functionality.

The patch.py script in this gist attempts to implement the solution suggested in issue 23972 by overriding the event loop's create_datagram_endpoint method with a modified version. This modified method can set extra socket options based on some new method arguments. This implementation allows my simple service discovery mechanism to work on OSX too.

This issue is mentioned here too.

Also see here for a good overview of the socket options being used in this implementation.

Demo

The demo script, demo.py attempts to create two UDP sockets that both bind to the same port. When it is run on OSX with the standard event loop the demo is expected to fail when the second socket is being bound. When it is run on OSX with the patched event loop, the demo is expected to complete. Once the sockets are established they will both send a message to the broadcast address. When both sockets receive two messages (their own and the one from the other socket) they fire a wait_done Future which finishes the demo script.

Run the demo using the standard event loop:

$ python3 demo.py

When run on Linux this is expected to complete but on OSX it is expected to fail with a OSError: [Errno 48] Address already in use.

Run the demo using the workaround event loop patched in:

$ python3 demo.py --workaround

When run on Linux and OSX this is expected to complete.

'''
This example tests the event loop patch implementation by attempting to create
two UDP sockets that both bind to the same port.
When run on OSX with the standard event loop the demo is expected to fail when
the second socket is attempting to bind to the same address/port combination.
When run on OSX with the patched event loop the demo is expected to complete.
Once the sockets are established they will both send a message to the
broadcast address. When both sockets receive two messages (their own and the
one from the other socket) they fire their wait_done Future.
Run the demo using the standard event loop:
.. code-block:: python3
python3 demo.py
When run on Linux this is expected to complete but on OSX it is expected to
fail with a OSError: [Errno 48] Address already in use.
Run the demo using the workaround event loop patched in:
.. code-block:: python3
$ python3 demo.py --workaround
When run on Linux and OSX this is expected to complete.
'''
import argparse
import asyncio
import signal
import socket
class ExampleProtocol(asyncio.DatagramProtocol):
'''
A trivial protocol that each endpoint will use to:
- print debug information; and,
- fire the wait_done Future indicating that the test is complete
after it receives two messages (i.e. b"Hello" and b"World").
'''
def __init__(self, name):
self.name = name
self.wait_ready = asyncio.Future()
self.wait_done = asyncio.Future()
self.wait_closed = asyncio.Future()
self.expected_msgs = set([b'Hello', b'World'])
def connection_made(self, transport):
print('{} connection_made'.format(self.name))
self.transport = transport
self.wait_ready.set_result(True)
def connection_lost(self, exc):
print('{} connection_lost{}'.format(
self.name, ': {}'.format(exc) if exc else ''))
self.wait_closed.set_result(True)
def datagram_received(self, data, addr):
print('{} datagram_received from {}: {}'.format(self.name, addr, data))
self.expected_msgs.discard(data)
if not self.expected_msgs:
self.wait_done.set_result(True)
def sendto(self, data, address):
print('{} sending {} to {}'.format(self.name, data, address))
self.transport.sendto(data, address)
@asyncio.coroutine
def demo(loop, args):
prot1 = ExampleProtocol('prot1')
if args.workaround:
_transport1, _p1 = yield from loop.create_datagram_endpoint(
lambda: prot1, local_addr=(args.address, args.port),
reuse_address=True, reuse_port=True,
allow_broadcast=True)
else:
_transport1, _p1 = yield from loop.create_datagram_endpoint(
lambda: prot1, local_addr=(args.address, args.port))
# Bind another socket to the same address/port combination. This action
# is expected to trigger the issue when run on OSX platform using the
# standard event loop. When the workaround is used this should succeed.
prot2 = ExampleProtocol('prot2')
if args.workaround:
_transport2, _p2 = yield from loop.create_datagram_endpoint(
lambda: prot2, local_addr=(args.address, args.port),
reuse_address=True, reuse_port=True,
allow_broadcast=True)
else:
_transport2, _p2 = yield from loop.create_datagram_endpoint(
lambda: prot2, local_addr=(args.address, args.port))
yield from asyncio.wait([prot1.wait_ready, prot2.wait_ready])
if not args.workaround:
# When using the standard (non-patched) loop the broadcast socket
# option must be set manually.
prot1.transport.get_extra_info('socket').setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, True)
prot2.transport.get_extra_info('socket').setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, True)
# Send a message to the broadcast address. All messages should be
# received by each endpoint.
target_addr = args.broadcast_addr
prot1.sendto(b'Hello', (target_addr, args.port))
prot2.sendto(b'World', (target_addr, args.port))
yield from asyncio.wait([prot1.wait_done, prot2.wait_done])
prot1.transport.close()
prot2.transport.close()
yield from asyncio.wait([prot1.wait_closed, prot2.wait_closed])
print('done')
ARGS = argparse.ArgumentParser('asyncio patch demo')
ARGS.add_argument(
'--workaround', action='store_true', default=False,
help="Patch the loop before running the demo")
ARGS.add_argument(
'--address', type=str, default='0.0.0.0',
help="The address to use for the demonstration")
ARGS.add_argument(
'--port', type=int, default=7000,
help="The port to use for the demonstration")
if __name__ == '__main__':
ip_addr = socket.gethostbyname(socket.gethostname())
# crude guess of the network broadcast address
subnet_broadcast_addr = '{}.255'.format('.'.join(ip_addr.split('.')[:-1]))
args = ARGS.parse_args()
args.broadcast_addr = subnet_broadcast_addr
if args.workaround:
import patch
(patch, ) # silence unused import warning
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
loop.run_until_complete(demo(loop, args))
'''
This module attempts to implement the solution suggested in:
https://bugs.python.org/issue23972
'''
import asyncio
import collections
import socket
class PatchedLoop(asyncio.SelectorEventLoop):
'''
The create_datagram_endpoint method below is a modified version of the
original in asyncio/base_events.py. This version can set the SO_REUSEPORT
socket option prior to the call to bind. This capability is controlled
by some new keyword arguments (e.g. reuse_address, reuse_port, and
allow_broadcast).
'''
@asyncio.coroutine
def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_address=None, reuse_port=None,
allow_broadcast=None):
"""Create datagram connection."""
if not (local_addr or remote_addr):
if family == 0:
raise ValueError('unexpected address family')
addr_pairs_info = (((family, proto), (None, None)),)
else:
# join addresss by (family, protocol)
addr_infos = collections.OrderedDict()
for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
assert isinstance(addr, tuple) and len(addr) == 2, (
'2-tuple is expected')
infos = yield from self.getaddrinfo(
*addr, family=family, type=socket.SOCK_DGRAM,
proto=proto, flags=flags)
if not infos:
raise OSError('getaddrinfo() returned empty list')
for fam, _, pro, _, address in infos:
key = (fam, pro)
if key not in addr_infos:
addr_infos[key] = [None, None]
addr_infos[key][idx] = address
# each addr has to have info for each (family, proto) pair
addr_pairs_info = [
(key, addr_pair) for key, addr_pair in addr_infos.items()
if not ((local_addr and addr_pair[0] is None) or
(remote_addr and addr_pair[1] is None))]
if not addr_pairs_info:
raise ValueError('can not get address information')
exceptions = []
for ((family, proto),
(local_address, remote_address)) in addr_pairs_info:
sock = None
r_addr = None
try:
sock = socket.socket(
family=family, type=socket.SOCK_DGRAM, proto=proto)
if reuse_address:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
if reuse_port:
if 'SO_REUSEPORT' in vars(socket):
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
if allow_broadcast:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, True)
sock.setblocking(False)
if local_addr:
sock.bind(local_address)
if remote_addr:
yield from self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
else:
break
else:
raise exceptions[0]
protocol = protocol_factory()
transport = self._make_datagram_transport(sock, protocol, r_addr)
return transport, protocol
# Now patch asyncio so we can create UDP sockets that are capable of binding
# to the same port OSX.
asyncio.SelectorEventLoop = PatchedLoop
asyncio.DefaultEventLoopPolicy._loop_factory = PatchedLoop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment