同步学Python:geventgevent调度流程解析。

gevent

 

greenlet已经落实了协程,但是这还的人工切换,是勿是当最好难为了,不要捉急,python还有一个比较greenlet更强有力的而能自行切换任务的模块gevent

  gevent凡当前运非常广阔的网络库,高效之轮询IO库libev加上greenlet实现的协程(coroutine),使得gevent的属性特别优良,尤其是当web应用被。本文介绍gevent的调度流程,主要不外乎gevent对greenlet的包和运,以及greenlet与libev的协作。阅读本文需要针对greenlet有肯定的认,可以参见当下首文章,另外,本文分析的gevent版本为1.2,可以透过gevent.version_info查看版本号。

夫规律是当一个greenlet遇到IO(指的是input output
输入输出,比如网络、文件操作等)操作时,比如看网络,就机关切换到其它的greenlet,等交IO操作完成,再以适宜的下切换回来继续执行。

gevent简介:

 
 gevent是冲协程(greenlet)的网络库,底层的波轮询基于libev(早期是libevent),gevent的API概念和Python标准库一致(如事件,队列)。gevent有一个分外有意思的东西-monkey-patch,能够如python标准库中的堵截操作成异步,如socket的读写。

   
gevent来源于eventlet,自称比后者实现还简便易行、API更利于且性能再好,许多开源的web服务器也使了gevent,如gunicorn、paste,当然gevent本生也足以作为一个python
web服务器使用。马上篇文章本着广的wsgi
server进行性能比,gevent不管在http1.0尚是http1.1都呈现大出色。下图是现阶段常用之http1.1正规下之见:

  图片 1

  gevent高效之妙方就是是greenlet和libev啦,greenlet在事先的博文发介绍,gevent对greenlet的利用比较限制,只能在点滴叠协程之间切换,简单吗无爱错。libev使用轮训非阻塞的点子进行事件处理,比如unix下的epoll。早期gevent使用libevent,后来交替成libev,因为libev“提供再不见的基本职能为求重改之效率”,这里发出libev和libevent的属性比:

图片 2

  

鉴于IO操作十分耗时,经常要程序处于等候状态,有矣gevent为咱机关切换协程,就管总有greenlet在运转,而非是等IO

greenlet回顾:

  如果想打听gevent的调度流程,最着重之凡对greenlet有基本的询问。下面总结一些个体觉得比较重大的触及:

  1. 各国一个greenlet.greenlet实例都出一个parent(可指定,默认为创生新的greenlet.greenlet所当条件),当greenlet.greenlet实例执行完毕逻辑正常了、或者抛来怪了时,执行逻辑切回到那个parent
  2. 得延续greenlet.greenlet,子类需要实现run方法,当调用greenlet.switch方法时见面调用到之run方法

  在gevent中,有三三两两只类似继承了greenlet.greenlet,分别是gevent.hub.Hub和gevent.greenlet.Greenlet。后文中,如果是greenlet.greenlet这种写法,那么靠的凡原生的类库greentlet,如果是greenlet(或者Greenlet)那么指gevent封装后的greenlet。

 

安装

greenlet调度流程:

  首先,给出总结性的下结论,后面又结合实例和源码一步步剖析。

  每个gevent线程都有一个hub,前面提到hub是greenlet.greenlet的实例。hub实例在用的时刻创生(Lazy
Created),那么该parent是main
greenlet。之后外的Greenlet(注意是greenlet.greenlet的子类)实例的parent都装成hub。hub调用libev提供的事件循环来处理Greenlet代表的任务,当Greenlet实例结束(正常或深)之后,执行逻辑又切换至hub。

pip3 install gevent

gevent调度示例1:

  我们看下面最简便的代码:

>>> import gevent
>>> gevent.sleep(1)  
>>>

  上面的代码很简短,但其实gevent的主导都含在其间,接下去做源码进行剖析。

  首先看sleep函数(gevent.hub.sleep):

1 def sleep(seconds=0, ref=True):
2     hub = get_hub()
3     loop = hub.loop
4     if seconds <= 0:
5         waiter = Waiter()
6         loop.run_callback(waiter.switch)
7         waiter.get()
8     else:
9         hub.wait(loop.timer(seconds, ref=ref))

 

   首先是抱hub(第2尽),然后在hub上wait这个定时器事件(第9实行)。get_hub源码如下(gevent.hub.get_hub):

 1 def get_hub(*args, **kwargs):
 2     """
 3     Return the hub for the current thread.
 4 
 5     """
 6     hub = _threadlocal.hub
 7     if hub is None:
 8         hubtype = get_hub_class()
 9         hub = _threadlocal.hub = hubtype(*args, **kwargs)
10     return hub

 

  可以观看,hub是线程内唯一的,之前为涉过greenlet是线程独立的,每个线程有分别的greenlet栈。hubtype默认就是gevent.hub.Hub,在hub的初始化函数(__init__)中,会创loop属性,默认为即是libev的python封装。

  回到sleep函数定义,hub.wait(loop.timer(seconds,
ref=ref))。hub.wait函数非常重大,对于其他阻塞性操作,比如timer、io都见面调用这个函数,其作用一样句话概括:从此时此刻协程切换到hub,直到watcher对应之风波就绪再于hub切换回来。wait函数源码如下(gevent.hub.Hub.wait):

 1     def wait(self, watcher):
 2         """
 3         Wait until the *watcher* (which should not be started) is ready.
 4 
 5         """
 6         waiter = Waiter()
 7         unique = object()
 8         watcher.start(waiter.switch, unique)
 9         try:
10             result = waiter.get()
11             if result is not unique:
12                 raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
13         finally:
14             watcher.stop()

  形参watcher就是loop.timer实例,其cython描述在corecext.pyx,我们简要了解成是一个定时器事件就是实行了。上面的代码中,创建了一个Waiter(gevent.hub.Waiter)对象,这个目标从呀打算为,这个看似的doc写得很了解

Waiter.__doc__  

A low level communication utility for greenlets.

Waiter is a wrapper around greenlet’s “switch()“ and
“throw()“ calls that makes them somewhat safer:

* switching will occur only if the waiting greenlet is executing
:meth:`get` method currently;
* any error raised in the
greenlet is handled inside :meth:`switch` and
:meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver
calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will
return the value/raise the exception

   简而言之,是对准greenlet.greenlet类switch 和
throw函数的分装,用来储存返回值greenlet的返回值或者捕获在greenlet中丢掉来之挺。我们解,在原生的greenlet中,如果一个greenlet抛来了好,那么该老将会见进展至其parent
greenlet。

  回到Hub.wait函数,第8执行 watcher.start(waiter.switch, unique)
注册了一个回调,在得时间(1s)之后调用回调函数waiter.switch。注意,waiter.switch此时连没有执行。然后第10履调用waiter.get。看看这个get函数(gevent.hub.Waiter.get):

 1     def get(self):
 2         """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
 3         if self._exception is not _NONE:
 4             if self._exception is None:
 5                 return self.value
 6             else:
 7                 getcurrent().throw(*self._exception)
 8         else:
 9             if self.greenlet is not None:
10                 raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
11             self.greenlet = getcurrent() # 存储当前协程,之后从hub switch回来的时候使用
12             try:
13                 return self.hub.switch() # switch到hub
14             finally:
15                 self.greenlet = None

 

  核心之逻辑在第11届15履行,11执行中,getcurrent获取当前的greenlet(在是测试代码中,是main
greenlet,即无限原始之greenlet),将那复制给waiter.greenlet。然后13履switch到hub,在greenlet回顾章节的老二漫长关系,greenlet.greenlet的子类需要再次写run方法,当调用子类的switch时会调用到该run方法。Hub的run方法实现如下:

 1     def run(self):
 2         """
 3         Entry-point to running the loop. This method is called automatically
 4         when the hub greenlet is scheduled; do not call it directly.
 5 
 6         :raises LoopExit: If the loop finishes running. This means
 7            that there are no other scheduled greenlets, and no active
 8            watchers or servers. In some situations, this indicates a
 9            programming error.
10         """
11         assert self is getcurrent(), 'Do not call Hub.run() directly'
12         while True:
13             loop = self.loop
14             loop.error_handler = self
15             try:
16                 loop.run()
17             finally:
18                 loop.error_handler = None  # break the refcount cycle
19             self.parent.throw(LoopExit('This operation would block forever', self))

 

  loop自然是libev的轩然大波循环。doc中涉及,这个loop理论及会见直接循环,如果得了,那么表明无其他监听的波(包括IO
定时当)。之前以Hub.wait函数中注册了定时器,那么以这个run中,如果时间及了,那么会调用定时器的callback,也即是前的waiter.switch,
我们重新来探望这个函数(gevent.hub.Waiter.switch):

 1     def switch(self, value=None):
 2         """Switch to the greenlet if one's available. Otherwise store the value."""
 3         greenlet = self.greenlet
 4         if greenlet is None:
 5             self.value = value
 6             self._exception = None
 7         else:
 8             assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
 9             switch = greenlet.switch
10             try:
11                 switch(value)
12             except:
13                 self.hub.handle_error(switch, *sys.exc_info())

 

  核心代码在第8届13实行,第8实行保证调用到拖欠函数的时节一定在hub这个协程中,这是挺自然之,因为这函数一定是当Hub.run中让调用。第11实践switch到waiter.greenlet这个协程,在教授waiter.get的时即便涉嫌了waiter.greenlet是main
greenlet。注意,这里得switch会回到main greenlet被切出底地方(也就算是main
greenlet挂于的地方),那即便是于waiter.get的第10推行,整个逻辑吗不怕恢复到main
greenlet继续执行。

   总结:sleep的打算大简单,触发一个死的操作,导致调用hub.wait,从眼前greenlet.greenlet切换至Hub,超时从此重新起hub切换到事先的greenlet继续执行由此这例子可以知道,gevent将其余阻塞性的操作封装成一个Watcher,然后起调用阻塞操作的协程切换到Hub,等到阻塞操作完成以后,再从Hub切换至事先的协程

  1. gevent的使用

gevent调度示例2:

  上面这事例,虽然会理顺gevent的调度流程,但实际上并没体现出gevent
协作的优势。接下来看看gevent
tutorial的例子:

 1 import gevent
 2 
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7 
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12 
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])
17 
18 # output
19 Running in foo
20 Explicit context to bar
21 Explicit context switch to foo again
22 Implicit context switch back to bar

 

  从出口可以见到,
foo和bar依次输出,显然是于gevent.sleep的时光发了行流程切换,gevent.sleep再前面早已介绍了,那么这里要关心spawn和joinall函数

  gevent.spawn本质调用了gevent.greenlet.Greenlet的类似措施spawn:

1     @classmethod
2     def spawn(cls, *args, **kwargs):
3         g = cls(*args, **kwargs)
4         g.start()
5         return g

  这个看似方式调用了Greenlet的少单函数,__init__ 和 start.
init函数中极重大之凡这段代码:  

1     def __init__(self, run=None, *args, **kwargs):
2         greenlet.__init__(self, None, get_hub()) # 将新创生的greenlet实例的parent一律设置成hub
3 
4         if run is not None:
5             self._run = run

  start函数的概念为老简短(gevent.greenlet.Greenlet.start):

1   def start(self):
2         """Schedule the greenlet to run in this loop iteration"""
3         if self._start_event is None:
4             self._start_event = self.parent.loop.run_callback(self.switch)

  注册回调事件self.switch到hub.loop,注意Greenlet.switch最终见面调用到Greenlet._run,
也就算是spawn函数传入的callable对象(foo、bar)。这里仅仅是报,但尚无开事件轮询,gevent.joinall就是因此来启动事件轮询并听候运行结果的。

  joinall函数会一路调用到gevent.hub.iwait函数:

 1 def iwait(objects, timeout=None, count=None):
 2     """
 3     Iteratively yield *objects* as they are ready, until all (or *count*) are ready
 4     or *timeout* expired.
 5     """
 6     # QQQ would be nice to support iterable here that can be generated slowly (why?)
 7     if objects is None:
 8         yield get_hub().join(timeout=timeout)
 9         return
10 
11     count = len(objects) if count is None else min(count, len(objects))
12     waiter = _MultipleWaiter() # _MultipleWaiter是Waiter的子类
13     switch = waiter.switch
14 
15     if timeout is not None:
16         timer = get_hub().loop.timer(timeout, priority=-1)
17         timer.start(switch, _NONE)
18 
19     try:
20         for obj in objects:
21             obj.rawlink(switch) # 这里往hub.loop注册了回调
22  
23         for idx in xrange(count):
24             print 'for in iwait', idx
25             item = waiter.get() # 这里会切换到hub
26             print 'come here ', item, getcurrent()
27             waiter.clear()
28             if item is _NONE:
29                 return
30             yield item
31     finally:
32         if timeout is not None:
33             timer.stop()
34         for obj in objects:
35             unlink = getattr(obj, 'unlink', None)
36             if unlink:
37                 try:
38                     unlink(switch)
39                 except:
40                     traceback.print_exc()

 

  然后iwait函数第23尽开始的循环,逐个调用waiter.get。这里的waiter是_MultipleWaiter(Waiter)的实例,其get函数最终调用到Waiter.get。前面已经详尽介绍了Waiter.get,简而言之,就是switch到hub。我们采取greenlet的tracing功能可看出整个greenlet.greenlet的switch流程,修改后底代码如下:

 1 import gevent
 2 import greenlet
 3 def callback(event, args):
 4     print event, args[0], '===:>>>>', args[1]
 5 
 6 def foo():
 7     print('Running in foo')
 8     gevent.sleep(0)
 9     print('Explicit context switch to foo again')
10 
11 def bar():
12     print('Explicit context to bar')
13     gevent.sleep(0)
14     print('Implicit context switch back to bar')
15 
16 print 'main greenlet info: ', greenlet.greenlet.getcurrent()
17 print 'hub info', gevent.get_hub()
18 oldtrace = greenlet.settrace(callback)
19         
20 gevent.joinall([
21     gevent.spawn(foo),
22     gevent.spawn(bar),
23 ])
24 greenlet.settrace(oldtrace)

  切换流程及由见下图:

  图片 3

  总结:gevent.spawn创建一个初的Greenlet,并报到hub的loop上,调用gevent.joinall或者Greenlet.join的早晚起切换到hub。

 

  本文通过个别单简易的例子并成源码分析了gevent的协程调度流程。gevent的以特别有利于,尤其是以web
server中,基本上应用App什么还毫不做就能分享gevent带来的益处。笔者阅读gevent源码最重点之案由在想询问gevent对greenlet的包和行使,greenlet很强大,强大到好失误,而gevent保证在个别重合协程之间切换,值得借鉴!

 

references:

http://www.cnblogs.com/xybaby/p/6337944.html

http://www.gevent.org/

https://pypi.python.org/pypi/greenlet

http://software.schmorp.de/pkg/libev.html

http://libevent.org/

http://eventlet.net/

http://nichol.as/benchmark-of-python-web-servers

http://libev.schmorp.de/bench.html

http://sdiehl.github.io/gevent-tutorial/

import gevent

def f(n):
for i in range(n):
print(gevent.getcurrent(), i)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()
运作结果

<Greenlet at 0x10e49f550: f(5)> 0
<Greenlet at 0x10e49f550: f(5)> 1
<Greenlet at 0x10e49f550: f(5)> 2
<Greenlet at 0x10e49f550: f(5)> 3
<Greenlet at 0x10e49f550: f(5)> 4
<Greenlet at 0x10e49f910: f(5)> 0
<Greenlet at 0x10e49f910: f(5)> 1
<Greenlet at 0x10e49f910: f(5)> 2
<Greenlet at 0x10e49f910: f(5)> 3
<Greenlet at 0x10e49f910: f(5)> 4
<Greenlet at 0x10e49f4b0: f(5)> 0
<Greenlet at 0x10e49f4b0: f(5)> 1
<Greenlet at 0x10e49f4b0: f(5)> 2
<Greenlet at 0x10e49f4b0: f(5)> 3
<Greenlet at 0x10e49f4b0: f(5)> 4
得视,3只greenlet是各个运行而无是轮岗运行

  1. gevent切换执行

import gevent

def f(n):
for i in range(n):
print(gevent.getcurrent(), i)
#所以来套一个耗时操作,注意勿是time模块中的sleep
gevent.sleep(1)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()
运转结果

<Greenlet at 0x7fa70ffa1c30: f(5)> 0
<Greenlet at 0x7fa70ffa1870: f(5)> 0
<Greenlet at 0x7fa70ffa1eb0: f(5)> 0
<Greenlet at 0x7fa70ffa1c30: f(5)> 1
<Greenlet at 0x7fa70ffa1870: f(5)> 1
<Greenlet at 0x7fa70ffa1eb0: f(5)> 1
<Greenlet at 0x7fa70ffa1c30: f(5)> 2
<Greenlet at 0x7fa70ffa1870: f(5)> 2
<Greenlet at 0x7fa70ffa1eb0: f(5)> 2
<Greenlet at 0x7fa70ffa1c30: f(5)> 3
<Greenlet at 0x7fa70ffa1870: f(5)> 3
<Greenlet at 0x7fa70ffa1eb0: f(5)> 3
<Greenlet at 0x7fa70ffa1c30: f(5)> 4
<Greenlet at 0x7fa70ffa1870: f(5)> 4
<Greenlet at 0x7fa70ffa1eb0: f(5)> 4

  1. 吃程序打补丁

from gevent import monkey
import gevent
import random
import time

def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())

gevent.joinall([
gevent.spawn(coroutine_work, “work1”),
gevent.spawn(coroutine_work, “work2”)
])
运行结果

work1 0
work1 1
work1 2
work1 3
work1 4
work1 5
work1 6
work1 7
work1 8
work1 9
work2 0
work2 1
work2 2
work2 3
work2 4
work2 5
work2 6
work2 7
work2 8
work2 9
from gevent import monkey
import gevent
import random
import time

有耗时操作时索要

monkey.patch_all() #
将顺序中之所以到之耗时操作的代码,换呢gevent中好实现的模块

def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())

gevent.joinall([
gevent.spawn(coroutine_work, “work1”),
gevent.spawn(coroutine_work, “work2”)
])
运行结果

work1 0
work2 0
work1 1
work1 2
work1 3
work2 1
work1 4
work2 2
work1 5
work2 3
work1 6
work1 7
work1 8
work2 4
work2 5
work1 9
work2 6
work2 7
work2 8
work2 9

=======================================================

原稿链接:做最专业最了解你的python开发者交流平台,提供你无限亟需之开发上资源。
我们注意让python开发技术的习及交流,我们坚持,每天进步同样稍步,人生进步同样异常步!关注【Python开发者交流平台】,与我们一起学习进步。

相关文章