首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >python 事件驱动 asyncio

python 事件驱动 asyncio

作者头像
用户5760343
发布2022-01-09 09:14:01
发布2022-01-09 09:14:01
8250
举报
文章被收录于专栏:sktjsktj

1 Asyncio loop = get_event_loop(): 得到当前上下文的事件循环。 loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。 loop.call_soon(callback, argument): 尽可能快调用 callback, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。 loop.time(): 以float类型返回当前时间循环的内部时间。 asyncio.set_event_loop(): 为当前上下文设置事件循环。 asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。 loop.run_forever(): 在调用 stop() 之前将一直运行。

2、执行:总执行9秒 import asyncio import datetime import time

def function_1(end_time, loop): print ("function_1 called {}".format(loop.time())) if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop()

def function_2(end_time, loop): print ("function_2 called ".format(loop.time())) if (loop.time() + 1.0) < end_time: loop.call_later(1, function_3, end_time, loop) else: loop.stop()

def function_3(end_time, loop): print ("function_3 called".format(loop.time())) if (loop.time() + 1.0) < end_time: loop.call_later(1, function_1, end_time, loop) else: loop.stop()

def function_4(end_time, loop): print ("function_5 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_4, end_time, loop) else: loop.stop()

loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0 print(end_loop) loop.call_soon(function_1, end_loop, loop)

loop.call_soon(function_4, end_loop, loop)

loop.run_forever() loop.close()

3、@asyncio.coroutine yield from 协程

Asyncio Finite State Machine

import asyncio import time from random import randint

@asyncio.coroutine def StartState(): print("Start State called \n") input_value = randint(0, 1) time.sleep(1) if (input_value == 0): result = yield from State2(input_value) else: result = yield from State1(input_value) print("Resume of the Transition : \nStart State calling " + result)

@asyncio.coroutine def State1(transition_value): outputValue = str("State 1 with transition value = %s \n" % transition_value) input_value = randint(0, 1) time.sleep(1) print("...Evaluating...") if input_value == 0: result = yield from State3(input_value) else : result = yield from State2(input_value) result = "State 1 calling " + result return outputValue + str(result)

@asyncio.coroutine def State2(transition_value): outputValue = str("State 2 with transition value = %s \n" % transition_value) input_value = randint(0, 1) time.sleep(1) print("...Evaluating...") if (input_value == 0): result = yield from State1(input_value) else : result = yield from State3(input_value) result = "State 2 calling " + result return outputValue + str(result)

@asyncio.coroutine def State3(transition_value): outputValue = str("State 3 with transition value = %s \n" % transition_value) input_value = randint(0, 1) time.sleep(1) print("...Evaluating...") if (input_value == 0): result = yield from State1(input_value) else : result = yield from EndState(input_value) result = "State 3 calling " + result return outputValue + str(result)

@asyncio.coroutine def EndState(transition_value): outputValue = str("End State with transition value = %s \n" % transition_value) print("...Stop Computation...") return outputValue

if name == "main": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())

5、asyncio.Task 并行执行这三个任务,我们将其放到一个task的list中: """ Asyncio using Asyncio.Task to execute three math function in parallel """ import asyncio @asyncio.coroutine def factorial(number): f = 1 for i in range(2, number + 1): print("Asyncio.Task: Compute factorial(%s)" % (i)) yield from asyncio.sleep(1) f *= i print("Asyncio.Task - factorial(%s) = %s" % (number, f))

@asyncio.coroutine def fibonacci(number): a, b = 0, 1 for i in range(number): print("Asyncio.Task: Compute fibonacci (%s)" % (i)) yield from asyncio.sleep(1) a, b = b, a + b print("Asyncio.Task - fibonacci(%s) = %s" % (number, a))

@asyncio.coroutine def binomialCoeff(n, k): result = 1 for i in range(1, k+1): result = result * (n-i+1) / i print("Asyncio.Task: Compute binomialCoeff (%s)" % (i)) yield from asyncio.sleep(1) print("Asyncio.Task - binomialCoeff(%s , %s) = %s" % (n, k, result))

if name == "main": tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()

python3 task.py Asyncio.Task: Compute factorial(2) Asyncio.Task: Compute fibonacci (0) Asyncio.Task: Compute binomialCoeff (1) Asyncio.Task: Compute factorial(3) Asyncio.Task: Compute fibonacci (1) Asyncio.Task: Compute binomialCoeff (2) Asyncio.Task: Compute factorial(4) Asyncio.Task: Compute fibonacci (2) Asyncio.Task: Compute binomialCoeff (3) Asyncio.Task: Compute factorial(5) Asyncio.Task: Compute fibonacci (3) Asyncio.Task: Compute binomialCoeff (4) Asyncio.Task: Compute factorial(6) Asyncio.Task: Compute fibonacci (4) Asyncio.Task: Compute binomialCoeff (5) Asyncio.Task: Compute factorial(7) Asyncio.Task: Compute fibonacci (5) Asyncio.Task: Compute binomialCoeff (6) Asyncio.Task: Compute factorial(8) Asyncio.Task: Compute fibonacci (6) Asyncio.Task: Compute binomialCoeff (7) Asyncio.Task: Compute factorial(9) Asyncio.Task: Compute fibonacci (7) Asyncio.Task: Compute binomialCoeff (8) Asyncio.Task: Compute factorial(10) Asyncio.Task: Compute fibonacci (8) Asyncio.Task: Compute binomialCoeff (9) Asyncio.Task - factorial(10) = 3628800 Asyncio.Task: Compute fibonacci (9) Asyncio.Task: Compute binomialCoeff (10) Asyncio.Task - fibonacci(10) = 55 Asyncio.Task - binomialCoeff(20 , 10) = 184756.0

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.05.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • loop.call_soon(function_4, end_loop, loop)
  • Asyncio Finite State Machine
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档