''' This example tests the event loop patch implementation by attempting to create two UDP sockets that both bind to the same port. When run on OSX with the standard event loop the demo is expected to fail when the second socket is attempting to bind to the same address/port combination. When run on OSX with the patched event loop the demo is expected to complete. Once the sockets are established they will both send a message to the broadcast address. When both sockets receive two messages (their own and the one from the other socket) they fire their wait_done Future. Run the demo using the standard event loop: .. code-block:: python3 python3 demo.py When run on Linux this is expected to complete but on OSX it is expected to fail with a OSError: [Errno 48] Address already in use. Run the demo using the workaround event loop patched in: .. code-block:: python3 $ python3 demo.py --workaround When run on Linux and OSX this is expected to complete. ''' import argparse import asyncio import signal import socket class ExampleProtocol(asyncio.DatagramProtocol): ''' A trivial protocol that each endpoint will use to: - print debug information; and, - fire the wait_done Future indicating that the test is complete after it receives two messages (i.e. b"Hello" and b"World"). ''' def __init__(self, name): self.name = name self.wait_ready = asyncio.Future() self.wait_done = asyncio.Future() self.wait_closed = asyncio.Future() self.expected_msgs = set([b'Hello', b'World']) def connection_made(self, transport): print('{} connection_made'.format(self.name)) self.transport = transport self.wait_ready.set_result(True) def connection_lost(self, exc): print('{} connection_lost{}'.format( self.name, ': {}'.format(exc) if exc else '')) self.wait_closed.set_result(True) def datagram_received(self, data, addr): print('{} datagram_received from {}: {}'.format(self.name, addr, data)) self.expected_msgs.discard(data) if not self.expected_msgs: self.wait_done.set_result(True) def sendto(self, data, address): print('{} sending {} to {}'.format(self.name, data, address)) self.transport.sendto(data, address) @asyncio.coroutine def demo(loop, args): prot1 = ExampleProtocol('prot1') if args.workaround: _transport1, _p1 = yield from loop.create_datagram_endpoint( lambda: prot1, local_addr=(args.address, args.port), reuse_address=True, reuse_port=True, allow_broadcast=True) else: _transport1, _p1 = yield from loop.create_datagram_endpoint( lambda: prot1, local_addr=(args.address, args.port)) # Bind another socket to the same address/port combination. This action # is expected to trigger the issue when run on OSX platform using the # standard event loop. When the workaround is used this should succeed. prot2 = ExampleProtocol('prot2') if args.workaround: _transport2, _p2 = yield from loop.create_datagram_endpoint( lambda: prot2, local_addr=(args.address, args.port), reuse_address=True, reuse_port=True, allow_broadcast=True) else: try: _transport2, _p2 = yield from loop.create_datagram_endpoint( lambda: prot2, local_addr=(args.address, args.port)) except Exception as exc: _transport1.close() raise exc from exc yield from asyncio.wait([prot1.wait_ready, prot2.wait_ready]) if not args.workaround: # When using the standard (non-patched) loop the broadcast socket # option must be set manually. prot1.transport.get_extra_info('socket').setsockopt( socket.SOL_SOCKET, socket.SO_BROADCAST, True) prot2.transport.get_extra_info('socket').setsockopt( socket.SOL_SOCKET, socket.SO_BROADCAST, True) # Send a message to the broadcast address. All messages should be # received by each endpoint. target_addr = args.broadcast_addr prot1.sendto(b'Hello', (target_addr, args.port)) prot2.sendto(b'World', (target_addr, args.port)) yield from asyncio.wait([prot1.wait_done, prot2.wait_done]) prot1.transport.close() prot2.transport.close() yield from asyncio.wait([prot1.wait_closed, prot2.wait_closed]) print('done') ARGS = argparse.ArgumentParser('asyncio patch demo') ARGS.add_argument( '--workaround', action='store_true', default=False, help="Patch the loop before running the demo") ARGS.add_argument( '--address', type=str, default='0.0.0.0', help="The address to use for the demonstration") ARGS.add_argument( '--port', type=int, default=7000, help="The port to use for the demonstration") if __name__ == '__main__': ip_addr = socket.gethostbyname(socket.gethostname()) # crude guess of the network broadcast address subnet_broadcast_addr = '{}.255'.format('.'.join(ip_addr.split('.')[:-1])) args = ARGS.parse_args() args.broadcast_addr = subnet_broadcast_addr if args.workaround: import patch (patch, ) # silence unused import warning loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, loop.stop) try: loop.run_until_complete(demo(loop, args)) finally: loop.close()