asyncio之执行并发任务
Task是与事件循环交互的主要方式之一。 Task包装协程并跟踪它的完成。 Tasks是Future的子类,所以其他协程等待他们并且每一个协程都会获得task完成时返回的结果。
启动Task
使用create_task()创建Task实例启动任务。只要loop正在运行,并且协程没有返回,task将做为事件循环并发操作的一部分一直运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import asyncio async def task_func(): print('in task_func') return 'the result' async def main(loop): print('createing task') task = loop.create_task(task_func()) print('waiting for {!r}'.format(task)) return_value = await task print('task completed {!r}'.format(task)) print('return value: {!r}'.format(return_value)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
|
这个例子中main()函数退出之前需要等待task返回结果。
1 2 3 4 5 6 7
| creating task waiting for <Task pending coro=<task_func() running at asyncio_create_task.py:12>> in task_func task completed <Task finished coro=<task_func() done, defined at asyncio_create_task.py:12> result='the result'> return value: 'the result'
|
取消Task
通过create_task()可以返回Task对象,这样就可以在task完成之前取消操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio async def task_func(): print('in task_func') return 'the result' async def main(loop): print('createing task') task = loop.create_task(task_func()) print('canceling task') task.cancel() print('canceled task {!r}'.format(task)) try: await task except asyncio.CancelledError: print('caught error from canceled task') else: print('task result: {!r}'.format(task.result())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
|
这个例子中在事件循环开始之前创建task然后取消,会从run_until_complete()函数中抛出CancelledError异常。
1 2 3 4 5 6
| # python3 asyncio_cancel_task.py creating task canceling task canceled task <Task cancelling coro=<task_func() running at asyncio_cancel_task.py:12>> caught error from canceled task
|
如果task在等待其他并发操作时被取消,它会在它等待的地方抛出CancelledError异常来通知task被取消。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import asyncio async def task_func(): print('in task_func, sleeping') try: await asyncio.sleep(1) except asyncio.CancelledError: print('task_func was canceled') raise return 'the result' def task_canceller(t): print('in task_canceller') t.cancel() print('canceled the task') async def main(loop): print('createing task') task = loop.create_task(task_func()) loop.call_soon(task_canceller, task) try: await task except asyncio.CancelledError: print('main() also sees task as canceled') event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
|
如果需要,截获异常能够有机会来清理已经完成的任务。
1 2 3 4 5 6 7 8
| $ python3 asyncio_cancel_task2.py creating task in task_func, sleeping in task_canceller canceled the task task_func was canceled main() also sees task as canceled
|
由协程生成Task
ensure_future()函数返回的Task可以绑定到协程上。然后Task实例可以传递给其他代码,这些代码可以在不知道原始协程是如何构造或调用的情况下等待它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio async def wrapped(): print('wrapped') return 'result' async def inner(task): print('inner: starting') print('inner: waiting for {!r}'.format(task)) result = await task print('inner: task returned {!r}'.format(result)) async def starter(): print('starter: creating task') task = asyncio.ensure_future(wrapped()) print('starter: waiting for inner') await inner(task) print('starter: inner returned') event_loop = asyncio.get_event_loop() try: print('entering event loop') result = event_loop.run_until_complete(starter()) finally: event_loop.close()
|
记住ensure_future()函数生成的协程不会执行,直到使用await允许它执行。
1 2 3 4 5 6 7 8 9 10
| $ python3 asyncio_ensure_future.py entering event loop starter: creating task starter: waiting for inner inner: starting inner: waiting for <Task pending coro=<wrapped() running at asyncio_ensure_future.py:12>> wrapped inner: task returned 'result' starter: inner returned
|
原文链接