Skip to content

Instantly share code, notes, and snippets.

@ShengRang
Created August 10, 2016 07:41
Show Gist options
  • Save ShengRang/131c6bc03e9d8287ecb8e61fafc2af97 to your computer and use it in GitHub Desktop.
Save ShengRang/131c6bc03e9d8287ecb8e61fafc2af97 to your computer and use it in GitHub Desktop.
coroutine scheduler
#!/usr/bin/env python
# encoding: utf-8
from functools import wraps, partial
from collections import deque
from select import select
import socket
class YieldEvent(object):
def handle_yield(self, sched, task):
raise NotImplementedError()
def handle_resume(self, sched):
raise NotImplementedError()
class NormalYield(YieldEvent):
def handle_yield(self, sched, task):
return True
class WriteSocket(YieldEvent):
def __init__(self, sock, data):
self.sock = sock
self.data = data
def handle_yield(self, sched, task):
sched.add_write_waiting(self.sock, self)
self.task = task
def handle_resume(self, sched):
self.task.send_val = self.sock.send(self.data)
sched.add_ready(self.task)
class ReadSocket(YieldEvent):
def __init__(self, sock, nbytes):
self.sock = sock
self.nbytes = nbytes
def handle_yield(self, sched, task):
sched.add_read_waiting(self.sock, self)
self.task = task
def handle_resume(self, sched):
self.task.send_val = self.sock.recv(self.nbytes)
sched.add_ready(self.task)
class AcceptSocket(YieldEvent):
def __init__(self, sock):
self.sock = sock
def handle_yield(self, sched, task):
sched.add_read_waiting(self.sock, self)
self.task = task
def handle_resume(self, sched):
self.task.send_val = self.sock.accept()
sched.add_ready(self.task)
class Socket(object):
def __init__(self, sock):
self.sock = sock
def accept(self):
return AcceptSocket(self.sock)
def recv(self, nbytes):
return ReadSocket(self.sock, nbytes)
def send(self, data):
return WriteSocket(self.sock, data)
class Task(object):
task_id_alloc = 0
def __init__(self, func):
self.task_id = Task.task_id_alloc
Task.task_id_alloc += 1
self.target = func()
self.func = func
self.send_val = None
def run(self):
return self.target.send(self.send_val)
def exit(self):
self.target.close()
class Scheduler(object):
def __init__(self):
self._ready = deque()
self._task_map = dict()
self._read_waiting = dict()
self._write_waiting = dict()
def add_ready(self, task):
self._ready.append(task)
def add_task(self, task):
self._task_map[task.task_id] = task
self.add_ready(task)
def add_target(self, target):
self.add_task(Task(target))
def remove_task(self, task):
task.exit()
del self._task_map[task.task_id]
def add_read_waiting(self, socket, event):
self._read_waiting[socket] = event
def add_write_waiting(self, socket, event):
self._write_waiting[socket] = event
def _iopoll(self):
rset, wset, xset = select(self._read_waiting, self._write_waiting, [])
for r_socket in rset:
event = self._read_waiting.pop(r_socket)
event.handle_resume(self)
for w_socket in wset:
event = self._write_waiting.pop(w_socket)
event.handle_resume(self)
def main_loop(self):
while True:
if not self._ready:
self._iopoll()
task = self._ready.popleft()
try:
res = task.run()
if isinstance(res, YieldEvent):
if res.handle_yield(self, task):
# return True if task is not block
self.add_ready(task)
else:
raise RuntimeError('wtf')
except StopIteration:
self.remove_task(task)
def target1():
for i in range(5):
print i
yield NormalYield()
class EchoServer(object):
def __init__(self, port=8888):
self.port = port
def server_loop(self):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_sock.bind(('0.0.0.0', self.port))
_sock.listen(128)
while True:
client_sock, addr = (yield AcceptSocket(_sock))
print 'connection from', addr
sched.add_target(partial(self.client_handler, client_sock))
def client_handler(self, sock):
s = Socket(sock)
line = ''
while True:
while True:
c = yield s.recv(1)
if not c or c == '\n':
break
line = line + c
if not line:
break
while line:
nsent = yield s.send(line)
line = line[nsent:]
sock.close()
print 'close client handler'
sched = Scheduler()
sched.add_target(target1)
echo_server = EchoServer()
sched.add_target(echo_server.server_loop)
sched.main_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment