主要介紹使用 Python 的 Asynchronous 可能遇到的雷跟坑。
故事的起因都是來自於 ContextVar :這是一個用來在 async 程式中保存上下文變數 (Context) 的模組。 他跟 threading.local 類似,但是是設計給非同步程式使用的。因為 coroutine 之間可能會共享同一個 Thread,因此使用 threading.local 不能保證資料的隔離性 (Isolation)。
下面的表格比較了 threading.local 跟 ContextVar 的差異:
| Feature | threading.local | ContextVar |
|---|---|---|
| Usage Context | Multithreading | Asynchronous Programming |
| Variable Scope | Per Thread | Per Async Task |
| Isolation | Each thread has its own copy | Each async task has its own copy |
| Use Case | Thread-specific data storage | Context-specific data storage |
因此在 async 這個情況下使用 ContextVar 可以確定資料的隔離性,避免不同 coroutine 之間互相影響: 在下面的例子中,我們建立了一個簡單的 HTTP 伺服器,並使用 ContextVar 來保存每個連線的客戶端位址。 不同的 coroutine 會有不同的 ContextVar 值,確保每個連線的資料不會互相干擾。
import asyncio
from contextvars import ContextVar
client = ContextVar("client", default="-")
def show_client_info() -> bytes:
client_info = client.get()
return f"Client address: {client_info}\n".encode()
async def handler(reader, writer):
addr = writer.transport.get_extra_info("socket").getpeername()
client.set(addr)
while True:
line = await reader.readline()
if not line.strip():
break
writer.write(b"HTTP/1.1 200 OK\r\n")
writer.write(b"\r\n")
writer.write(show_client_info())
await writer.drain()
writer.close()
async def main():
srv = await asyncio.start_server(handler, "127.0.0.1", 8080)
async with srv:
await srv.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
POC
定義一個 ContextVar,然後在不同的 coroutine 裡面設定,每個 ContextVar 都會有自己的值且互不影響。
透過 ctx.set() 設定值並使用 ctx.get() 取得值。在 coroutine 結束前使用 ctx.reset(token)
恢復到之前的狀態。這邊可以使用一個 asynccontextmanager 來包裝避免忘記重設或有異常發生時沒有重設。
但是配合 async generator 使用時,會遇到一個問題: 當在外層 coroutine 裡面中斷 (break) async generator 時,會導致內層的 async generator 無法正確清理 context,進而引發 ValueError 與 asyncio.CancelledError。
發生的原因在於使用 async generator 時,當脫離當下 scope 時,是透過建立新的 asyncio Task 來執行 清理工作 (cleanup),而不是在原本的 Task 裡面執行。在配合 ContextVar 時會導致 context 丟失進而引發錯誤。
import asyncio
from contextlib import asynccontextmanager
from contextvars import ContextVar
ctx = ContextVar("context_var", default="-")
@asynccontextmanager
async def runner_with_context(name: str):
token = ctx.set(name)
try:
print(f"before yeild: {asyncio.current_task().get_name()=}")
yield
finally:
print(f"after yeild: {asyncio.current_task().get_name()=}")
ctx.reset(token)
async def iterator():
async with runner_with_context("demo_context"):
for i in range(3):
yield f"Iterator {i=} in {ctx.get()}"
async def main():
async with runner_with_context("main_context"):
async for item in iterator():
print(item)
# Without the break, the runner_with_context are in the same asyncio Task
# and it works fine.
#
# With the break, the async generator is cancelled and switch to another Task,
# causing the context to be lost and raising errors.
#
# This break causes the inner async generator to not clean up properly,
# triggering ValueError and asyncio.CancelledError. It is a known issue with
# async generators and contextvars when the generator is not fully exhausted,
# and also when the generator is cancelled or interrupted.
break # <-- problematic line that raises ValueError and asyncio.CancelledError
if __name__ == "__main__":
asyncio.run(main())
Official Solution
這個問題在 Python 官方 Github 中已經被 提及 並且有官方的解決方案:contextlib.aclosing。
透過 aclosing 來包裝 async generator 可以確保離開 async generator 時會使用相同 Task 來執行清理工作。 這在官方文件中清楚描述,透過 aclosing 可以保證在離開 async generator 時會使用相同的 Task 來執行清理工作
This pattern ensures that the generator’s async exit code is executed in the same context as its
iterations (so that exceptions and context variables work as expected, and the exit code isn’t run
after the lifetime of some task it depends on).
async def main():
async with aclosing(iterator()) as it:
async for _ in it:
print(item)
break # <-- now works fine with the same Task for cleanup