Metallurgy in Computer Science

資訊冶金,不僅僅是技術上的紀錄,還有一些生活經驗啥的

0%

Peek through python twisted -- an asynchronize event handling

最近在 trace Matrix 的原始碼,它(目前比較完整的是 Synapse)的底層用到了 python 的事件驅動框架 – Twisted,雖然說不懂底層大概還是可以模糊的理解整體流程,但是這樣對整個非同步的事件處理流程理解上可以說是完全沒有辦法,舉例來說:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# We process PDUs and EDUs in parallel. This is important as we don't
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
pdu_results, _ = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._handle_pdus_in_txn, origin, transaction, request_time
),
run_in_background(self._handle_edus_in_txn, origin, transaction),
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)

因此在這一篇文章我主要會著重在理解 python twisted 非同步函式呼叫的模型,還有討論一下 python async/await 這個語法糖

前言

網路上其實不少教學討論 python twisted (像是 Python Twisted介紹使用inlineCallbacks实现类同步语法用inlineCallbacks來管理callbacks)或是官方文件(像是Introduction to DeferredsDeferred ReferenceGenerating Deferreds

不過儘管看完官方文件之後,我其實也還不理解 python twisted 的運作(最主要是 blocking 還是 non-blocking 的行為),其他教學要不是有點機器翻譯,不然就是比較教科書(喜歡用一些魔法來直接得到一些結論)

總而言之,強烈建議讀者如果想了解 python twisted 執行流程,還是需要乖乖把環境裝好跑看看一些實驗(我下面也會放一些可以動的 code,官網範例儘管有範例,但是是 pseudo code,而且函式調用的接口並沒有明確定義)

什麼是 python twisted

Python Twisted 是主要處理異步回傳鍊(Callback Chain),用 reactor 模型(軟體工程裡面那個)來決定處理的順序,需要注意的是「並行(Concurrent)」與「異步(Asynchronized)」有著本質上的區別(至少在 python 裡面是這樣)

python 裡的異步主要都是指「單執行緒」,也就是上圖第三個 single-threaded 的版本
因此所有處理之間都是 blocking 的

根據 stackoverflow 上這篇 multiprocessing vs multithreading vs asyncio in Python 3 中提到,async 異步主要拿來處理「多個且同時 IO-bound 來源」的處理,我的理解是同時處理這些不同來源需要大量且混雜的回傳處理,因此建立一個良好的回傳處理 / 控制權交接關係非常重要

我一開始讀的時候以為是「單一 IO-bound 來源」的處理,所以我就很好奇為啥不直接開一個執行緒,專門把這些大資料放到一個 buffer 去,讓其他任務能 non-blocking 的執行下去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
from twisted.python.failure import Failure

from twisted.internet import threads, reactor, defer

def loadRemoteData():
import time
time.sleep(1)
print("Load Remote Data Done !")

# remove self here will cause an error
# builtins.TypeError: ProcessRemoteData() takes 0 positional arguments but 1 was given
def ProcessRemoteData(self):
import time
time.sleep(2)
print("Process Remote Data Done !")

def server():
d = threads.deferToThread(loadRemoteData)
d.addCallback(ProcessRemoteData)
return d

a = server()
print("Init server Done")

reactor.callLater(4, reactor.stop)
reactor.run()

在上面這個 pseudo server 範例中,server()在 threads.deferToThread(loadRemoteData) 會開啟一個獨立的獨立的執行緒,這個執行緒會負責先去取得資料後處理

這個 deferToThread會馬上返回一個 Deferred ,這個名詞在之後會非常頻繁出現,這有點像是 JS 的 promise,或者說是 python 的 future

簡單來說,這是一個預留的值(place holder),我們保證在某個時間點這個變數會返回值(也就是當這個異步執行結束後),而後面的 addCallback(ProcessRemoteData) 則是一個 callback 函式,表達當這個 Deferred 被正確執行後的處理行為

最後的 reactor.run() 則是正式觸發這個異步執行(你可以試試看把這行註解掉,看看結果會變怎樣),Deferred 事件的執行依賴於 reactor 這個模型,這同時也是軟體工程中提到的 reactor 模式,透過一個 reactor 來分配到不同的 callback handler;而 reactor.callLater(4, reactor.stop)則是告訴這個 reactor 在四秒之後停止

我們實際跑過後會得到以下行為

Init server Done
(… wait one second)
Load Remote Data Done !
(… wait two second)
Process Remote Data Done !

如果我們把上面的範例改一下順序

1
2
3
4
5
6
7
8
...

a = server()

reactor.callLater(4, reactor.stop)
reactor.run()

print("Init server Done")

透過把服務器初始化的順序與資料處理調換,我們來觀察 Deferred 本身是不是 non-blocking 的

(… wait one second)
Load Remote Data Done !
(… wait two second)
Process Remote Data Done !
Init server Done

實際執行之後會發現,Deferred 本身是 blocking 的,也就是指 Deferred 的實做事實上並不涉及並行執行,它 並沒有讓你的程式神奇般的變成 multi-threaded,正如前面提到的,Deferred 唯一幫你作的事是梳理函式呼叫的順序,「它並不在乎被調用的函式是同步還是異步」

為了讓你的程式多出其他的功能(像是異步執行),你需要其他的關鍵字

Await / Async

因為 twisted 本身主要還是負責 Deferred 這塊的設計,也就是 callback 的順序,但是為了達到異步執行的目的,我們往往需要搭配其他的關鍵字像是 await / async(我們也可以在最一開始的範例中看到 Synapse 大量使用 await / async 來搭配 twisted 架構)

Twisted Deferred 架構事實上與同步/異步執行是完全解耦合的,它唯一在意的是事件的執行順序,透過事件的觸發決定下一個執行的函式是誰,它並不在乎這個函式是同步還是異步的

雖然現在才開始講 yield 有點晚,但是我假設大家可能知道 yield 是什麼,但是跟我一樣不知道它到底可以做什麼

感謝 Python協程:從yield/send到async/await(原文連結失效所以放這篇) Python3: 淺談 Python 3.3 的 Yield From 表達式(主要是因為前面那篇文章講 yield from 的時候突然開始飆車,一時看不懂所以跟著後面那篇文章做了點實驗)

根據 Python協程:從yield/send到async/await 這篇文章的說法,python 曾經對 mutli-thread programming 相當不友好,因此相較於其他語言動不動就開多線程然後 mutex lock 來 lock 去的,python 更喜歡把單線程發揮到極致(不過後來 multiprocess 這類函式庫就出來了)

因此要搞懂 await/async 在幹什麼,我覺得應該還是要從始祖 yield 開始聊起

Yield / Yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def callee():
for i in range(3):
yield i
print('callee: ', i)
return 'end'

def caller():
x = yield from func()
print('caller: ', x)

# iterate through for
for i in caller():
print("i: ", i)

# iterate through next
a = caller()
next(a)
next(a)
next(a)
next(a)

單就 callee 來看,我們可以知道這是一個 generator function,如果單純呼叫 b = callee() 我們會獲得一個 generator instance,在每次迭代它的時候繼續執行 callee

caller 的加入讓這一切變得有點不一樣,這個小東西的加入目的就在於讓我們能完成不同任務間控制權的交割(用 linux task 比喻的話,就是決定 foreground task 跟 background task 要怎麼切換)

yield from 這個東西,就我的理解它就是一個 wrapper for generator,正如 Python3: 淺談 Python 3.3 的 Yield From 表達式 所說,「yield from讓我們可以把「另一個生成器生成的值」當成「自己要生成的值」」

如果你執行上面的程式會發現,caller 的 print 會最晚執行,所以我們可以得知,一旦使用 yield from 則在 main 裡無論對 generator instance 怎麼迭代,實際上迭代的都是 callee,直到 callee 無法迭代才會輪到 caller,這就表示 yield from是一個跨迭代的 blocking 行為

好了那為什麼我們要拖褲子放屁 – 多這個 caller 去迭代 callee 呢?直接迭代 callee 不行嗎?

控制權交割

透過 「yield from 實際上就是把控制權讓給別的任務」這個認知,我們可以透過 yield from 建立起事件的回傳鍊

詳細的例子在 Python協程:從yield/send到async/await 裡 「asyncio.coroutine 和 yield from 」有範例,我這邊就簡單用 pseudo code 講(原本例子可能考慮是拿現實生活的例子,搞的有點複雜)

1
2
3
4
5
6
7
8
9
10
def task_a():
# some processing work on task a
yield from task_b()

def task_b():
# some processing work on task b
yield something

def main():
a = task_a()

簡單來說,我們可以不斷透過 yield from轉移控制權到其他函式上,直到該函式執行到 yield 交還控制權

那有沒有可能形成一個 yield from loop,以上面的例子來說就是 yield from main() 裡面的其他進入點?

Python Future

個人感覺 Python協程:從yield/send到async/awaitFuture 加進去混著講解把事情搞的有點複雜

1
2
3
4
5
6
7
8
class Future:
#blabla...
def __iter__(self):
if not self.done():
self._blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.

python future 的實做非常有趣,它本質上也是需要呼叫者(caller)用 yield from 轉換控制權等待的事件

1
2
3
4
5
6
7
def caller():
# do caller stuff
future = futures.Future(loop=loop)
try:
return (yield from future)
finally:
h.cancel()

透過上面的 caller 可以看到,我們會先等到 future yield 後才結束,但是這邊我們需要保證 future 裡的事件執行結束後才轉換控制權到 caller

也就是說,我們要怎麼在 future 還沒成功的時候,就算 caller 想要讓我們回傳控制權(可能是透過主函式迭代),我們也可以繼續保有自己的控制權?

這邊就是相當有趣的地方了,我們可以看到 Future 實做中,當事件尚未完成(if not self.done())每次迭代都只會 yield self 也就是回給它根本一模一樣的 generator,只要 future 還沒完成,其他人迭代這個 generator 想要獲得控制權的結果,就是重新取得一個 generator 叫 caller 繼續等

混用異步與同步函式

Deferred 的操作大都需要運作在全部都是Deferred 的環境下,因此它也提供讓不同物件轉換成 Deferred 的 API,像是 ensureDeferred 可以把 python coroutine 轉換成 Deferred

1
2
3
4
5
def authenticateUser(isValidUser, user):
if isValidUser(user):
print "User is authenticated"
else:
print "User is not authenticated"

今天假設有如上這個驗證用戶身份的程式,其中 isValidUser() 這個函式呼叫有可能是同步的,也可能是是異步的,對於這個範例,Twisted 有一個中心準則:「Everything is (can be) a Deferred」

1
2
3
4
5
6
7
8
9
10
11
from twisted.internet import defer

def printResult(result):
if result:
print "User is authenticated"
else:
print "User is not authenticated"

def authenticateUser(isValidUser, user):
d = defer.maybeDeferred(isValidUser, user)
d.addCallback(printResult)

我們透過 maybeDeferred() 這個函式強制把回傳的值變成一個 Deferred,然後在後面加上處理(像這邊就是把它印出來)


後序

每次都會發現邊寫文章時,原始寫的動機會開始被扭曲,然後開始歪樓討論別的東西,像這個我原本就是想知道為什麼 twisted 不是平行執行的(我一開始以為它會幫你開一個 thread),但是後來就發現自己錯的離譜,最後開始探討 python 語法的問題

這可能還是因為我底子不足,不過 python 真的是一們易學難精的語言,表層各式各樣的行為對應到底下實做,不同層級的語言注重的點又差異很大(前一陣子白天去實驗室看 C RDMA,晚上看 python await,這兩個可以說是非常兩極化了,搞的我每次開始讀另外一邊的東西都要先適應一下),如果不能夠很好的掌握這些行為的話,我也不太敢用 python 寫一些比較大的東西,下一步我可能會看 twisted 有沒有用在 multi-thread 的(主要先還是繼續看 Synapse,畢竟目標是看懂 Synapse 然後開始去弄 golang 版本的)

Welcome to my other publishing channels