并发是指一次处理多件事。
并行是指一次做多件事。
二者不同,但是有联系。
一个关于结构,一个关于执行。
并发用于制定方案,用来解决可能(但未必)并行的问题。
——Rob Pike
Go 语言的创造者之一
异步版 hello-world 1 2 3 4 5 6 7 8 9 10 11 12 13 import asyncioasync def main (): print('hello' ) await asyncio.sleep(.1 ) print('world' ) asyncio.run(main())
运行协程的三种方式
asyncio.run()
使用 await 关键字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioimport timeasync def say_after (delay, what ): await asyncio.sleep(delay) print(what) async def main (): print(f"started at {time.strftime('%X' )} " ) await say_after(1 , 'hello' ) await say_after(2 , 'world' ) print(f"finished at {time.strftime('%X' )} " ) asyncio.run(main())
使用 asyncio.create_task() 创建一个 Task 对象
1 2 3 4 5 6 7 8 9 10 11 async def main (): t1 = asyncio.create_task(say_after(1 , 'hello' )) t2 = asyncio.create_task(say_after(2 , 'world' )) print(f"started at {time.strftime('%X' )} " ) await t1 await t2 print(f"finished at {time.strftime('%X' )} " )
Awaitable 对象 awaitable 对象是指可以在 await 表达式中使用的对象。
coroutines, Tasks 和 Futures 是 awaitable 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def nested (): return 42 async def main (): nested() print(await nested()) asyncio.run(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 import asyncioasync def nested (): return 42 async def main (): t = asyncio.create_task(nested()) await t asyncio.run(main())
并发执行 Tasks 使用 asyncio.gather 并发执行 Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioasync def factorial (name, number ): f = 1 for i in range (2 , number + 1 ): print(f"Task {name} : Compute factorial({i} )..." ) await asyncio.sleep(1 ) f *= i print(f"Task {name} : factorial({number} ) = {f} " ) async def main (): await asyncio.gather( factorial('A' , 2 ), factorial('B' , 3 ), factorial('C' , 4 ), ) asyncio.run(main())
线程和协程的对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import threadingimport itertoolsimport timeimport sysclass Signal : go = True def spin (msg, signal ): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\' ): status = char + ' ' + msg write(status) flush() write('\x08' * len (status)) time.sleep(.1 ) if not signal.go: break write(' ' * len (status) + '\x08' * len (status)) def slow_funtion (): time.sleep(3 ) return 42 def supervisor (): signal = Signal() spinner = threading.Thread( target=spin, args=('thinking!' , signal)) print('spinner object: ' , spinner) spinner.start() result = slow_funtion() signal.go = False spinner.join() return result def main (): result = supervisor() print('Answer: ' , result) if __name__ == '__main__' : main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncioimport itertoolsimport sysasync def spin (msg ): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\' ): status = char + ' ' + msg write(status) flush() write('\x08' * len (status)) try : await asyncio.sleep(.1 ) except asyncio.CancelledError: break write(' ' * len (status) + '\x08' * len (status)) async def slow_funtion (): await asyncio.sleep(3 ) return 42 async def supervisor (): spinner = asyncio.create_task(spin('thinking!' )) print('spinner object: ' , spinner) result = await slow_funtion() spinner.cancel() return result loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print('Answer: ' , result) asyncio.run(supervisor())
Task 对象像是实现协作式多任务的库(如 gevent)中的绿色线程(green thread)。
Task 对象用于驱动协程,Thread 对象用于调用可调用对象。
Task 对象不由自己手动实例化,而是由 asyncio.create_task 方法获取。
获取的 Task 对象已经排定了运行时间,而 Thread 实例需要调用 start 方法运行。
异步版 slow_funtion 是协程,由 await (就是 yield from)驱动。
终止线程需要借助外部变量 go,终止 Task 可以调用 Task.cancel() 实例方法,在协程内部抛出 CancelledError 异常,协程内部也可以捕获这个异常,处理终止请求。