Python語言是由Guido van Rossum大牛在1989年發明,它是當今世界受歡迎的計算機編程語言之一,也是一門“學了有用、學了能用、學會能久用”的計算生態語言。
為此,CSDN作為國內大的IT中文社區,特向廣大Python愛好者開設了Python學習班,幫助大家在學習的道路上少走彎路,事半功倍。3月16號晚上8點,我們特邀請知名Python技術專家陳舸老師在班級里舉行分享活動。
陳舸,8年開發經驗,曾就職華為、烽火通信,目前創業中。技術涉獵廣泛,嵌入式開發,Linux,Python,iOS,Web均有涉及。《Python Cookbook第三版》譯者,《Linux/Unix系統編程手冊 下卷》以及《算法精解 C語言描述》合作譯者。
以下為昨晚的分享內容:
大家好,今天給大家介紹Python中的協程(coroutine),讓大家對協程能有一個基本的認識。本文將從迭代器、生成器的基礎講起,通過生成器實現協程,后將簡單介紹Python3.5中新增關鍵字async/await對協程的支持。本次分享中的代碼示例如不加特別說明將兼容Python2和Python3。使用Python2的同學可以通過from future import print_function來導入print函數。
大家都知道Python里有一個for語句。我們可以用for來循環迭代一個序列。Python中可迭代的對象有很多,比如我們所熟悉的:
迭代列表
for x in [1,2,3,4,5]: print(x)
迭代字典
languages = {'Java':'James Gosling', 'Python':'Guido van Rossum'} for lang, author in languages.items():
print("{0} create {1}".format(author, lang))
迭代文本
for line in open("logfile.log"):
print(line)
迭代字符串
for char in "hello world":
print(char)
為什么可以迭代許多不同的對象呢?因為Python中存在著一個迭代協議。我們再來看看這個例子:
>>> items = [1, 2, 3]
>>> it = iter(items)
>>> it.next() 1 >>> it.next() 2 >>> it.next() 3 >>> it.next()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
這一次我們沒有通過for來迭代列表items,而是先通過Python的內置函數iter將列表items轉換成一個迭代器,然后不斷調用迭代器的next()方法來得到序列中的值,當超出可迭代范圍時,就拋出了StopIteration異常。
因此對于如下的for循環
alert("Hellofor x in iterableObj: print(x)
Python解釋器是這樣為我們處理的:
_iter = iter(iterableObj) while True: try:
x = _iter.next() except StopIteration: break print(x)
所以,Python的迭代協議就是要求對象的iter()方法返回一個特殊的迭代器對象,并且該對象必須實現next()方法,并使用StopIteration異常來通知迭代的完成。滿足上述要求的對象,我們就認為它是迭代器對象。這樣,在for語句中迭代時,Python會自動為我們調用迭代器的next()方法。
也就是說,我們自定義的對象如果也想用在for語句中來迭代的話,只需要滿足迭代協議的要求,實現iter()和next()方法,并在next()中捕獲StopIteration異常就可以了。好啦,那我們就根據上述要求,自己實現一個迭代器對象吧。
自定義可迭代對象, 迭代器版
class LowerLetters(object): def __init__(self): self.current = 'a' def __next__(self): if self.current > 'z': raise StopIteration
result = self.current
self.current = chr(ord(result)+1) return result def __iter__(self): ''' 只需要返回self即可 ''' return self
letters = LowerLetters() for i in letters:
print(i) # 輸出a-z的小寫字母
剛剛說了迭代器,我們再來看看Python中的生成器。先看看怎么定義一個Python的生成器。
def countdown(n): print("Counting down from", n) while n > 0: yield n
n -= 1
看起來和普通的Python函數并無什么區別,都是用def來定義函數而已啊。只是while循環中的那個yield好像不太熟,而且函數里沒有出現過return。先不管那么多,我們調用一下上面這個函數看看。
>>> x = countdown(10) # 注意,并沒有打印出任何內容 >>> x
<generator object at 0x58490>
>>>
奇怪了,明明函數定義里有一個print打印啊,調用它居然沒有打印信息出來,這說明函數就沒有開始執行嘛。沒錯,查看x我們發現它是一個generator對象,也就是生成器對象。函數countdown調用之后,只是返回了一個生成器對象而已,函數中的語句并沒有立刻開始執行。那怎么樣才能讓函數開始執行呢?我們在迭代器中講過的next()又要登場了。
注意,Python2中生成器對象的next方法在Python3中更改為next
這里以Python3為示例,用Python2的同學需要用next()
x.__next__()
Counting down from 10 10 >>>
>>> x.__next__() 9 >>> x.__next__() 8 ... >>> x.__next__() 1 >>> x.__next__()
Traceback (most recent call last):
File "<stdin>", line 1, in ?
StopIteration
>>>
通過對生成器對象x調用next()方法,我們發現countdown函數體中的語句終于開始執行了,而且還伴隨著一個現象,那就是每次執行next(),函數就在yield語句處返回一個值,然后就停住不動了,直到下一次調用next()時才會又繼續執行下去,如此往復,直到函數返回。
剛剛我們看到拋出StopIteration異常了,此時函數countdown已經返回了。這里是不是有似曾相似的感覺?之前講的迭代器里,也有StopIteration異常,也出現了next。那到底迭代器和生成器有什么區別呢?
如果我們查看生成器x所包含的方法:
>>> dir(x)
['__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__next__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw'] >>> '__iter__' in dir(x) True >>> '__next__' in dir(x) True
我們發現生成器對象中也包含有iter()和next()方法! 那么根據Python中的迭代協議,可以得知生成器其實也是一種迭代器。既然它們本質上都是迭代器,那生成器的好處體現在哪里了呢?還記得我們在講迭代器時實現的那個自定義類LowerLetters嗎?為了滿足Python迭代協議,我們分別實現了iter()和next()方法,并在next()中處理StopIteration異常。可是你再看看我們的生成器函數countdown,我們搞了那么多東西嗎?沒有!因為這里Python自動幫我們搞定了迭代協議,簡單多了。讓我們用生成器重新定義一個LowerLetter。
自定義可迭代對象,生成器版
def lowerLetter(): lowerLetter.current = 'a' while lowerLetter.current <= 'z':
result = lowerLetter.current
lowerLetter.current = chr(ord(result)+1) yield result #用yield產生值 for i in lowerLetter(): ... print(i) # 輸出a-z
可以看到,相比迭代器版本,生成器版本的實現簡潔多了,一個magic方法都不涉及,你甚至都不需要寫一個類。生成器可以用來簡化實現迭代器對象。
迭代器和生成器都有一種特質,它們都可迭代(iterable),但是只可以迭代一輪,一旦迭代結束,所有產生的值都不會保存,如果你要再次迭代,那么需要重新調用迭代器/生成器一次。這和Python內置的列表等數據結構有很大不同,我們知道,一個list你是可以反復迭代多次的。
>>> x = countdown(10)
>>> for v in x: ... print(v) ... Counting down from 10 10 ... 1 >>> for v in x: ... print(v) ... >>> #再次迭代不會產生任何值了
看到了嗎,輪for結束后,再次迭代就不會產生任何值了,因為已經到StopIteration了。中間產生的值都是按需生成,每次由Python幫我們調用next()得到,不會保存起來。這么做的好處是體現了惰性求值,即,需要的時候再計算,而不是像list那樣不管你要不要,我一次性全扔內存里。當迭代的序列較大時,生成器/迭代器相比list會顯著減少內存的占用。而且,有時候如果無法預先知道要迭代的上限時,這時就只能用迭代器/生成器來解決了。
我們需要區分可迭代對象(iterable)和迭代器(iterator)。迭代器一定是可迭代對象,但可迭代對象不一定是迭代器,list就是好的例子。list可迭代,但它不包含next方法,因此根據Python迭代協議,它不是迭代器。可以通過下列方法來判斷對象是否是迭代器。
>>> from collections import Iterator >>> isinstance(lowerLetter(), Iterator) True >>> isinstance([], Iterator) False >>> isinstance({}, Iterator) False
好了,前面扯了那么多,終于要到正題了。那么什么是協程呢?從名字上來看,協表示協作,協程就是互相協作的例程,也正對應了其英文稱謂cooperative routine。 據Donald Knuth所說,其實協程的概念早在1958年就由Melvin Conway提出了,而本介紹協程的出版物則在1963年出現。它到底是什么?協程和我們前面提到的迭代器、生成器有什么關系嗎?
還記得前面講生成器時給出的例子吧,我們用到了yield這個關鍵字。函數中出現了yield,使得函數不再是普通的函數,而變成了生成器。yield不但可以產生值,還會使得生成器保存執行上下文后暫停執行,然后在下一次next()時從上次暫停的地方繼續接著執行。正是由于這一特點,使得Python中的生成器具有了協程的部分特征(可自我暫停,稍后再恢復執行)。而自從Python2.5開始,通過PEP 342 – Coroutines via Enhanced Generators的引入,Python終于可以通過生成器來實現協程了。
Python2.5中為生成器對象增加了send()和close()方法,并且支持了yield表達式。這有什么用處呢?我們還是先看一個簡單的例子。
簡單的字符串匹配, 若發送過來的字符串中包含pattern則打印出來
def grep(pattern): print("Looking for %s" % pattern) while True:
line = (yield) # yield表達式. 注意和之前例子中yield的寫法做比較 if pattern in line:
print(line)
OK,我們來分析一下上面的代碼。首先,因為出現了yield,grep不再是普通的函數了,調用它將產生一個生成器對象,這和我們之前講過的沒有區別。
>>> g = grep("python") >>> g
<generator object grep at 0x106620cd0> >>>
由于g是生成器對象,函數體不會立刻執行。需要先調用一次next(),Python2中則是調用next()。
>>> g.__next__()
Looking for python
現在函數體開始執行了,然后在yield處暫停,這也和我們之前討論的行為是一致的。
我們再來看看line = (yield),這個時候yield寫在了=的右邊,使其成為了一個yield表達式。可以理解為通過yield得到了某個值然后賦值給了line。那這個值是什么呢?又是如何通過yield得到這個值的呢?
這里就是send()方法開始顯現威力的時候了。我們可以對生成器對象g調用send方法,發送數據給它,而發送的數據就通過yield得到并賦值給了line。
>>> g.send('I like python')
I like python #匹配了pattern, 打印出來 >>> g.send('this is a test') >>> # 沒有匹配pattern, 無打印
OK,我們再對grep的整個執行流程來一次梳理。g = grep(“python”)產生了一個生成器對象,此時函數體沒有開始執行。我們先對g調用一次next()或者通過g.send(None)來啟動生成器,這時函數體開始執行,到line = (yield)這一句時暫停執行。接下來,通過g.send(“I like python”)發送數據給g,函數體又恢復了執行,此時發送過來的字符串”I like python”就通過yield賦值給了line,然后繼續執行后面的判斷邏輯,發現匹配到了pattern,打印出了字符串。然后繼續while循環,又遇到了yield,此時函數的執行再次暫停。如此反復通過send,函數體就不斷的yield出新值處理一下,然后再次在yield處暫停。
發現什么了嗎?這里的grep實際上就和我們的主程序通過send調用形成了一種協作式的處理流程。主程序通過調用g.send()喚起grep函數體的執行,grep函數體處理完后又在yield處自己暫停執行,等待主程序再次通過send喚起自己,有一種你方唱罷我登場的感覺。這正體現出了協程的特點:任務的執行流可自我掛起,其他程序又可以喚起任務,讓它繼續執行。
如果主程序調用了g.close()會如何呢?簡單,那樣的話grep協程就徹底退出了,不再是掛起。
我們再來看一個稍復雜一點的例子。在這個例子里我們會說明生成器和協程的區別。在給出具體的代碼前,我們先定義一個幫助函數。
def coroutine(func): def start(*args, **kwargs): cr = func(*args, **kwargs)
cr.send(None) # 自動幫我們啟動協程, 讓其在yield處掛起 return cr return start
有經驗的同學應該能意識到,這個函數是用來做裝飾器的。還記得之前我們定義好生成器之后,要讓生成器的函數體得到執行必須要先調用send(None)或者next()嗎?這是為了先啟動生成器讓它在yield處掛起。而這個啟動的步驟有可能會忘記做,那么我們就用裝飾器來幫我們自動處理這個啟動的步驟。
接下來定義兩個函數。
import time def follow(thefile, target): thefile.seek(0, 2) # go to the end of the file while True:
line = thefile.readline() # 數據源,這里得到文本行 if not line:
time.sleep(0.1) continue target.send(line) # 發送給協程target @coroutine def printer(): while True:
line = (yield)
print(line)
把上面兩個函數組合起來使用
f = open("somefile.txt") follow(f, printer())
用一個示意圖來表示上面代碼的流程。實際上我們將follow和printer組成了一個管道。follow是管道的源頭,它負責產生數據,然后通過send把數據發給了協程printer做處理。
還可以在管道中增加協程嗎?當然可以,我們就把前面的grep拿過來改改,再放在管道中。現在變成了這樣:
@coroutine def grep(pattern,target): while True:
line = (yield) # 從數據源得到數據 if pattern in line:
target.send(line) # 自己處理完再發給下一個協程繼續處理 f = open("access-log")
follow(f, grep('python', printer()))
這里follow仍當做管道的源頭,是數據源,驅動整個管道的運行。grep協程在這里起到了過濾的作用,查看follow發過來的數據中是否包含有字符串python,若匹配到了就把數據發給下一個協程printer打印出來,然后自己在yield處掛起等待下一次follow的send調用。這里的follow(),grep()和printer()一起協同工作,實現了查看log文件中是否包含有特定字符串的功能。當然你還可以繼續擴展這個程序完成更多的功能。
看到這里,熟悉Unix/Linux的同學應該倍感親切啊。這和Unix/Linux的哲學很相似:提供一組簡單的工具,每個工具只處理一種任務,但你可以通過各種不同的組合將它們連在一起處理更復雜的任務。其實這個簡單的例子也可以用更一般化的迭代來處理:
def do_process_in_one_func(thefile, pattern): thefile.seek(0, 2) # go to the end of the file while True:
line = thefile.readline() if not line:
time.sleep(0.1) continue else: if pattern in line:
print(line)
我們稍微修改了一下例子,把所有的處理邏輯都放在一個函數里,通過迭代文件的每一行來達到相同的目的。只是這樣一來就把讀取文件行、過濾、打印三種不同的任務都寫在了一起,這其實不利于任務的劃分,而且這個函數也沒法再和其他的工具一起協作了,喪失了靈活性。也就是說,協程其實可以幫助我們劃分程序模塊,使得每個任務變得簡單單一,同時也能夠和其他的程序協作,提高復用性。越是復雜的程序,這么做就越有利。
下面該說說生成器和協程的區別了。到這里估計很多同學把生成器和協程搞混了,這兩貨也確實很相似,因為都包含有yield。生成器我們可以看做是生產者,它負責產生數據,通常是用來做迭代使用的。而協程是消費者,通過yield接收其他程序send給它的數據然后處理。那協程有沒有可能也是生產者呢?有可能哦,上面例子里的grep就有雙重身份,它既處理(消費)管道上游發來的數據,而處理完之后又通過調用下游協程的send方法把處理過的數據發給下游(生產)。因此協程一定會消費,如果看不到消費而只有生產,那么是生成器,否則就是協程。好在Python 3.5引入的新關鍵字async/await,徹底解放了我們,這個我們稍后再談。
由于協程具有自我掛起(不能被其他協程搶占控制權,除非自己放棄)稍后再恢復執行的特質,我們可以利用協程來模擬任務,進而實現用戶態線程。為什么說是用戶態線程呢?因為協程的調度執行完全在用戶空間,操作系統內核不感知。也就是說,協程的調度需要由我們自己來控制。相比系統級的線程和進程,協程占用的資源極少,任務的切換也不需要內核來調度,省去了上下文切換的開銷。這樣的特性使得單機創建大量協程成為可能(百萬級)。下面的例子我們就來實現一個簡單的用戶態調度器,并用協程來模擬操作系統中的任務。
from queue import Queue
模擬任務
class Task(object): taskid = 0 def __init__(self, target): Task.taskid += 1 self.tid = Task.taskid
self.target = target
self.sendval = None def run(self): return self.target.send(self.sendval)
調度器類, mainloop方法實現了簡單的eventloop
class Scheduler(object): def __init__(self): self.ready = Queue()
self.taskmap = {} def new(self, target): newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask) return newtask.tid def exit(self, task): print("Task %d terminated" % task.tid) del self.taskmap[task.tid] def schedule(self, task): self.ready.put(task) def mainloop(self): while self.taskmap:
task = self.ready.get() try:
result = task.run() if isinstance(result, SystemCall):
result.task = task
result.sched = self
result.handle() continue except StopIteration:
self.exit(task) continue self.schedult(task)
系統調用基類
class SystemCall(object): def handle(self): pass
獲取任務id的系統調用
class GetTid(SystemCall): def handle(self): self.task.sendval = self.task.tid
self.sched.schedule(self.task)
定義兩個協程, 將作為我們的任務由調度器調度執行
def foo(): mytid = yield GetTid()
print("I am foo", mytid) yield def bar(): mytid = yield GetTid()
print("I am bar", mytid) yield if __name__ == '__main__':
sched = Scheduler()
循環一百萬次, 共創建兩百萬個任務
for task in range(1000000):
sched.new(foo())
sched.new(bar())
sched.mainloop()
上面這個程序一共創建了兩百萬個任務,通過我們簡單實現的事件循環不斷交錯調度執行(完全沒有用到線程,當然除了程序自身的主線程之外)。程序大部分時間花在創建任務上了,真正執行的時候是非常快的。看到這里,有的同學會問了,如果要用協程,那豈不是還要我們自己寫調度程序,而實現一個功能完備的事件循環并不是人人都能輕松完成的啊?由操作系統調度系統原生的線程、進程不是更方便嗎?沒錯,的確是這樣,由于協程完全處于用戶態,要做到互相交錯執行,的確需要把OS的調度功能移到用戶態來完成,好在許多的庫(如gevent,以及Python3.4中加入標準庫的asynio)已經幫我們完成了這些任務,真正要用的時候并不需要我們自己完成,這里僅僅只是一個簡單的例子用以說明。學從難處學,用從易處用。
總結一下,目前業界用來處理高并發的方案一般有兩種:
1.以Node.js為代表的異步回調方案。Python的Twisted網絡庫也是同樣的思路。這種方案利用事件循環、非阻塞IO和異步回調機制。簡單來說就是,每當遇到IO或者其他耗時的操作時,注冊一個回調到事件循環中,這時程序接著干其他的事情,當IO完成后由事件循環回調我們之前注冊的callback。這種方式讓程序盡可能的執行,而不需要我們自己創建其他線程。
這種方式的缺點是容易遇到callback hell,回調套回調。因為所有的阻塞操作都必須是異步的,否則只要有一環阻塞,系統就卡死了。另外就是異步的方式有些違反人類的思維習慣,人類還是習慣用同步的方式來思考。當然,針對這些問題,現在也已經有了比較好的解決方法,這里就不多說了。
2.協程。協程似乎天生就適合于處理這類問題(IO密集型,程序大部分時間在等待IO變得可用,但實際處理的任務相對簡單,并不會占用大量CPU資源)。協程是運行在用戶態的輕量級線程,資源占用極少,因此單機創建大量的協程成為了可能。而且協程的任務切換完全在用戶空間解決,無需操作系統內核干預,大量減少了因為任務切換而產生的調度開銷。
本文僅介紹Python的協程,關于異步IO以及高并發編程等主題,這里就不適合深入討論了。
看到這里的同學應該已經對Python的協程有了較清晰的認識了,但可能還是有一件事覺得不爽。Python語言一向號稱簡潔優美,代碼可讀性高,可是剛剛講了這么多,Python的協程功能居然從2.5版本后才可以通過生成器變相實現出來,有時候必須讀代碼才能搞清楚到底是生成器還是協程(其實在語言層面上Python并不認識什么協程,我們只是用生成器實現了編程概念中的協程。可以說,在Python3.5之前,語言原生不支持協程,我們前面看到的只是利用生成器實現了協程的功能)。如果能像Go語言定義goroutine(可簡單理解為Go語言對協程的實現)那樣簡單就好了,不然怎么也對不起Python簡潔優美易讀的美譽啊。Python 3.5中新增的關鍵字async/await解決了這個“痛點”。
Python3.5中定義協程的方式
async def native_coroutine(): await awaitableObj
現在好了,凡是由async def定義的函數,在Python 3.5中將被認為是原生協程(雖然在Python解釋器中仍然是利用生成器來實現的)。
>>> async def native_coro(): ... pass ... >>> a = native_coro()
>>> a #a現在是一個coroutine對象了! <coroutine object native_coro at 0x109893678>
>>>
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。