我已经试验了一段时间 asyncio
并阅读了 PEPs ;一些教程;甚至是 O'Reilly book .
我想我已经掌握了窍门,但我仍然对 loop.close()
的行为感到困惑,我不太清楚何时可以“安全”调用它.
归根结底,我的用例是一堆阻塞的“老派”调用,我将其包装在 run_in_executor()
和一个外部协程中;如果这些调用中的任何一个出错,我想停止进程,取消那些仍然未完成的调用,打印一个合理的日志,然后(希望是干净利落地)让开。
比如说,像这样:
import asyncio
import time
def blocking(num):
time.sleep(num)
if num == 2:
raise ValueError("don't like 2")
return num
async def my_coro(loop, num):
thử:
result = await loop.run_in_executor(None, blocking, num)
print(f"Coro {num} done")
return result
except asyncio.CancelledError:
# Do some cleanup here.
print(f"man, I was canceled: {num}")
def main():
loop = asyncio.get_event_loop()
tasks = []
for num in range(5):
tasks.append(loop.create_task(my_coro(loop, num)))
thử:
# No point in waiting; if any of the tasks go wrong, I
# just want to abandon everything. The ALL_DONE is not
# a good solution here.
future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
done, pending = loop.run_until_complete(future)
if pending:
print(f"Still {len(pending)} tasks pending")
# I tried putting a stop() - with/without a run_forever()
# after the for - same exception raised.
# loop.stop()
for future in pending:
future.cancel()
for task in done:
res = task.result()
print("Task returned", res)
except ValueError as error:
print("Outer except --", error)
finally:
# I also tried placing the run_forever() here,
# before the stop() - no dice.
loop.stop()
if pending:
print("Waiting for pending futures to finish...")
loop.run_forever()
loop.close()
我尝试了 stop()
Và run_forever()
调用的几种变体,“先运行永远,然后停止”似乎是根据 to the pydoc 和,不调用 close()
会产生令人满意的结果:
Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3
Process finished with exit code 0
但是,当添加对 close()
的调用时(如上所示),我得到两个异常:
exception calling callback for
Traceback (cuộc gọi gần đây nhất):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
这充其量是烦人的,但对我来说,完全令人费解:而且,更糟糕的是,我一直无法弄清楚处理这种情况的正确方法是什么。
因此,两个问题:
为了我个人的满意,还有:
- 它为什么会上涨?循环还想从 coros/tasks 得到什么:它们要么退出;要么上调;或被取消:这还不足以让它开心吗?
非常感谢您提出任何建议!
Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the run_in_executor()
and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding
这不能像预期的那样工作,因为 run_in_executor
将函数提交给线程池,并且操作系统线程不能在 Python(或公开它们的其他语言)中取消。取消 run_in_executor
返回的 future 将尝试取消基础 concurrent.futures.Future
,但这只会在阻塞函数尚未运行时生效,例如因为线程池很忙。一旦开始执行,就无法安全取消。与线程相比,支持安全可靠的取消是使用 asyncio
的好处之一。
如果您正在处理同步代码,无论是遗留的阻塞调用还是运行时间较长的 CPU 绑定(bind)代码,您应该使用 run_in_executor
运行它并合并一种中断它的方法。例如,代码可能偶尔会检查 stop_requested
标志,如果为真则退出,可能会引发异常。然后您可以通过设置适当的标志或标志来“取消”这些任务。
how should I modify the code above in a way that with the call to close() included does not raise?
据我所知,如果不修改 blocking
和顶级代码,目前无法做到这一点。 run_in_executor
会坚持将结果通知给事件循环,当事件循环关闭时这会失败。 asyncio future 被取消也无济于事,因为取消检查是在事件循环线程中执行的,错误发生在这之前,当 call_soon_threadsafe
被工作线程调用时。 (或许可以将检查移至工作线程,但应仔分割析它是否会导致调用 cancel()
和实际检查之间的竞争条件。)
why does it raise at all? what more does the loop want from the coros/tasks: they either exited; raised; or were canceled: isn't this enough to keep it happy?
它希望传递给 run_in_executor
的阻塞函数(问题中的字面意思是 blocking
)在事件循环关闭之前已经开始完成运行。您取消了 asyncio future,但潜在的并发 future 仍然想“打电话回家”,发现循环已关闭。
目前尚不清楚这是 asyncio 中的错误,还是您根本不应该关闭事件循环,直到您以某种方式确保提交给 run_in_executor
的所有工作都已完成。这样做需要进行以下更改:
- 不要尝试取消未决 future 。从表面上看,取消它们看起来是正确的,但它会阻止您为这些 future
wait()
,因为 asyncio 会认为它们已完成。
- 相反,向您的后台任务发送特定于应用程序的事件,通知它们需要中止。
- hiện hữu
loop.close()
Đã gọi trước loop.run_until_complete(asyncio.wait(pending))
.
通过这些修改(除了特定于应用程序的事件 - 我只是让 sleep()
完成它们的过程),没有出现异常。
what actually happens if I don't call close()
- in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?
由于典型的事件循环与应用程序运行一样长,因此在程序的最后不调用 close()
应该没有问题。无论如何,操作系统都会在程序退出时清理资源。
gọi loop.close()
对于具有明确生命周期的事件循环很重要。例如,一个库可能会为特定任务创建一个新的事件循环,在专用线程中运行它,然后处理掉它。未能关闭此类循环可能会泄漏其内部资源(例如它用于线程间唤醒的管道)并导致程序失败。另一个例子是测试套件,它通常为每个单元测试启动一个新的事件循环,以确保测试环境的分离。
biên tập:TÔI
filed a bug对于这个问题。
Chỉnh sửa 2: 错误是
fixed由开发者提供。
Tôi là một lập trình viên xuất sắc, rất giỏi!