Created
August 10, 2016 07:41
-
-
Save ShengRang/131c6bc03e9d8287ecb8e61fafc2af97 to your computer and use it in GitHub Desktop.
coroutine scheduler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # encoding: utf-8 | |
| from functools import wraps, partial | |
| from collections import deque | |
| from select import select | |
| import socket | |
| class YieldEvent(object): | |
| def handle_yield(self, sched, task): | |
| raise NotImplementedError() | |
| def handle_resume(self, sched): | |
| raise NotImplementedError() | |
| class NormalYield(YieldEvent): | |
| def handle_yield(self, sched, task): | |
| return True | |
| class WriteSocket(YieldEvent): | |
| def __init__(self, sock, data): | |
| self.sock = sock | |
| self.data = data | |
| def handle_yield(self, sched, task): | |
| sched.add_write_waiting(self.sock, self) | |
| self.task = task | |
| def handle_resume(self, sched): | |
| self.task.send_val = self.sock.send(self.data) | |
| sched.add_ready(self.task) | |
| class ReadSocket(YieldEvent): | |
| def __init__(self, sock, nbytes): | |
| self.sock = sock | |
| self.nbytes = nbytes | |
| def handle_yield(self, sched, task): | |
| sched.add_read_waiting(self.sock, self) | |
| self.task = task | |
| def handle_resume(self, sched): | |
| self.task.send_val = self.sock.recv(self.nbytes) | |
| sched.add_ready(self.task) | |
| class AcceptSocket(YieldEvent): | |
| def __init__(self, sock): | |
| self.sock = sock | |
| def handle_yield(self, sched, task): | |
| sched.add_read_waiting(self.sock, self) | |
| self.task = task | |
| def handle_resume(self, sched): | |
| self.task.send_val = self.sock.accept() | |
| sched.add_ready(self.task) | |
| class Socket(object): | |
| def __init__(self, sock): | |
| self.sock = sock | |
| def accept(self): | |
| return AcceptSocket(self.sock) | |
| def recv(self, nbytes): | |
| return ReadSocket(self.sock, nbytes) | |
| def send(self, data): | |
| return WriteSocket(self.sock, data) | |
| class Task(object): | |
| task_id_alloc = 0 | |
| def __init__(self, func): | |
| self.task_id = Task.task_id_alloc | |
| Task.task_id_alloc += 1 | |
| self.target = func() | |
| self.func = func | |
| self.send_val = None | |
| def run(self): | |
| return self.target.send(self.send_val) | |
| def exit(self): | |
| self.target.close() | |
| class Scheduler(object): | |
| def __init__(self): | |
| self._ready = deque() | |
| self._task_map = dict() | |
| self._read_waiting = dict() | |
| self._write_waiting = dict() | |
| def add_ready(self, task): | |
| self._ready.append(task) | |
| def add_task(self, task): | |
| self._task_map[task.task_id] = task | |
| self.add_ready(task) | |
| def add_target(self, target): | |
| self.add_task(Task(target)) | |
| def remove_task(self, task): | |
| task.exit() | |
| del self._task_map[task.task_id] | |
| def add_read_waiting(self, socket, event): | |
| self._read_waiting[socket] = event | |
| def add_write_waiting(self, socket, event): | |
| self._write_waiting[socket] = event | |
| def _iopoll(self): | |
| rset, wset, xset = select(self._read_waiting, self._write_waiting, []) | |
| for r_socket in rset: | |
| event = self._read_waiting.pop(r_socket) | |
| event.handle_resume(self) | |
| for w_socket in wset: | |
| event = self._write_waiting.pop(w_socket) | |
| event.handle_resume(self) | |
| def main_loop(self): | |
| while True: | |
| if not self._ready: | |
| self._iopoll() | |
| task = self._ready.popleft() | |
| try: | |
| res = task.run() | |
| if isinstance(res, YieldEvent): | |
| if res.handle_yield(self, task): | |
| # return True if task is not block | |
| self.add_ready(task) | |
| else: | |
| raise RuntimeError('wtf') | |
| except StopIteration: | |
| self.remove_task(task) | |
| def target1(): | |
| for i in range(5): | |
| print i | |
| yield NormalYield() | |
| class EchoServer(object): | |
| def __init__(self, port=8888): | |
| self.port = port | |
| def server_loop(self): | |
| _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| _sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| _sock.bind(('0.0.0.0', self.port)) | |
| _sock.listen(128) | |
| while True: | |
| client_sock, addr = (yield AcceptSocket(_sock)) | |
| print 'connection from', addr | |
| sched.add_target(partial(self.client_handler, client_sock)) | |
| def client_handler(self, sock): | |
| s = Socket(sock) | |
| line = '' | |
| while True: | |
| while True: | |
| c = yield s.recv(1) | |
| if not c or c == '\n': | |
| break | |
| line = line + c | |
| if not line: | |
| break | |
| while line: | |
| nsent = yield s.send(line) | |
| line = line[nsent:] | |
| sock.close() | |
| print 'close client handler' | |
| sched = Scheduler() | |
| sched.add_target(target1) | |
| echo_server = EchoServer() | |
| sched.add_target(echo_server.server_loop) | |
| sched.main_loop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment