Skip to content

Instantly share code, notes, and snippets.

@youqingkui
Created September 8, 2016 03:04
Show Gist options
  • Select an option

  • Save youqingkui/b5029e3709fdbfdc9706709ce8ca61cf to your computer and use it in GitHub Desktop.

Select an option

Save youqingkui/b5029e3709fdbfdc9706709ce8ca61cf to your computer and use it in GitHub Desktop.

Revisions

  1. youqingkui created this gist Sep 8, 2016.
    115 changes: 115 additions & 0 deletions gevent-multil-process.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    #encoding=utf-8
    '''
    演示如何多进程的使用gevent,
    1、gevent和multiprocessing组合使用会有很多问题,
    所以多进程直接用subprocess.Popen,进程间不通过fork共享
    任何数据,完全独立运行,并通过socket通信
    2、进程间同步不能用multiprocessing.Event,
    因为wait()的时候会阻塞住线程,其它协程的代码无法执行,也
    不能使用gevent.event.Event(),因为它通过multiprocessing.Process
    共享到子进程后,在父进程set(),子进程wait()是不会收到信号的
    3、子进程内不能通过signal.signal(signal.SIGINT, signal.SIG_IGN)
    忽略ctrl+c,所以启动主进程时如果没设置后台运行,在ctrl+c时,主进程
    和子进程都会中止而不能优雅退出
    4、主进程和子进程的通信和同步使用gevent.socket来实现,子进程收到
    主进程断开连接事件(接受到零字节数据)时,自己优雅退出,相当于主进程
    发消息告诉子进程让子进程退出
    5、主进程启动时直接在后台运行,使用"nohup gevent-multil-process.py &"来运行,
    测试时可不用nohup命令,停止主进程时使用kill pid的方式,在主进程里
    会拦截SIGTERM信号,通知并等待子进程退出
    '''
    import gevent
    import gevent.socket as socket
    from gevent.event import Event
    import os
    import sys
    import subprocess
    import signal

    url = ('localhost', 8888)

    class Worker(object):
    '''
    子进程运行的代码,通过起一个协程来和主进程通信
    包括接受任务分配请求,退出信号(零字节包),及反馈任务执行进度
    然后主协程等待停止信号并中止进程(stop_event用于协程间同步)。
    '''
    def __init__(self, url):
    self.url = url
    self.stop_event = Event()
    gevent.spawn(self.communicate)
    self.stop_event.wait()
    print 'worker(%s):will stop' % os.getpid()
    def exec_task(self, task):
    print 'worker(%s):execute task:%s' % (os.getpid(), task.rstrip('\n'))
    def communicate(self):
    print 'worker(%s):started' % os.getpid()
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(self.url)
    fp = client.makefile()
    while True:
    line = fp.readline()
    if not line:
    self.stop_event.set()
    break
    '单独起一个协程去执行任务,防止通信协程阻塞'
    gevent.spawn(self.exec_task, line)

    class Master():
    '''
    主进程运行代码,启动单独协程监听一个端口以供子进程连接和通信用,
    通过subprocess.Popen启动CPU个数个子进程,注册SIGTERM信号以便在
    KILL自己时通知子进程退出,主协程等待停止事件并退出主
    '''
    def __init__(self, url):
    self.url = url
    self.workers = []
    self.stop_event = Event()

    gevent.spawn(self.communicate)
    gevent.sleep(0) #让communicate协程有机会执行,否则子进程会先启动

    self.process = [subprocess.Popen(('python',sys.argv[0],'worker'))
    for i in xrange(3)] #启动multiprocessing.cpucount-1个子进程

    gevent.signal(signal.SIGTERM, self.stop) #拦截kill信号

    gevent.spawn(self.test) #测试分发任务

    self.stop_event.wait()

    def communicate(self):
    print 'master(%s):started' % os.getpid()
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(url)
    server.listen(1024)
    while True:
    worker, addr = server.accept()
    print 'master(%s):new worker' % os.getpid()
    self.workers.append(worker)

    def stop(self):
    print 'master stop'
    for worker in self.workers:
    worker.close()
    for p in self.process:
    p.wait()
    self.stop_event.set()

    def test(self):
    import random
    while True:
    if not self.workers:
    gevent.sleep(1)
    continue
    task = str(random.randint(100,10000))
    worker = random.choice(self.workers)
    worker.send(task)
    worker.send('\n')
    gevent.sleep(1)

    if len(sys.argv) == 1:
    Master(url)
    else:
    Worker(url)