最近刚看完python多线程,为了加深印象,按照1分钟实现“延迟消息”功能的思路,实现了一个简易版的异步队列。
高效延时消息,包含两个重要的数据结构: 1.环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组) 2.任务集合,环上每一个slot是一个Set 同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。 Task结构中有两个很重要的属性: (1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务 (2)Task-Function:需要执行的任务指针
下边是代码(代码不止100行,但是在200行内,也算100行了。)
#! -*- coding: utf-8 -*-try: import cPickle as pickleexcept ImportError: import pickletry: import simplejson as jsonexcept ImportError: import jsonimport osimport errnoimport Queueimport randomimport loggingfrom functools import wrapsfrom threading import Timer, RLock, Threadfrom time import sleep, timefrom base64 import b64encode, b64decode# json 的数据结构# tasks = {# index: {# cycle_num: [(func, bargs)]# }# }logging.basicConfig(level=logging.DEBUG,
format='(%(asctime)-15s) %(message)s',)
tasks_file = 'tasks.json'flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY# 为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量WORKER_NUMS = 2q = Queue.Queue(WORKER_NUMS)lock = RLock()def check_file():
try:
file_handle = os.open(tasks_file, flags) except OSError as e: if e.errno == errno.EEXIST: # Failed as the file already exists.
pass
else: raise
else: with os.fdopen(file_handle, 'w') as file_obj:
file_obj.write("{}")def set_delay_task(func_name, *args, **kwargs):
# 使用锁来保证每次只要一个线程写入文件,防止数据出错
with lock: with open(tasks_file, 'r+') as json_file:
count_down = kwargs.pop('count_down', 0)
tasks = json.load(json_file) # 执行时间
exec_time = int(time()) + count_down # 循环索引
index = str(exec_time % 3600) # 圈数
cycle_num = str(exec_time / 3600 + 1)
dargs = pickle.dumps((args, kwargs))
bargs = b64encode(dargs)
index_data = tasks.get(index, {})
index_data.setdefault(cycle_num, []).append((func_name, bargs))
tasks[index] = index_data
json_file.seek(0)
json.dump(tasks, json_file)
logging.debug('Received task: %s' % func_name)def get_delay_tasks():
with open(tasks_file, 'r+') as json_file:
tasks = json.load(json_file) # 执行时间
current_time = int(time()) # 循环索引
index = str(current_time % 3600) # 圈数
cycle_num = str(current_time / 3600 + 1)
current_tasks = tasks.get(index, {}).get(cycle_num, [])
tasks = [] for func, bargs in current_tasks:
dargs = b64decode(bargs)
args, kwargs = pickle.loads(dargs)
tasks.append((func, (args, kwargs))) return tasksdef get_method_by_name(method_name):
possibles = globals().copy()
possibles.update(locals())
method = possibles.get(method_name) return methoddef create_task(task_class, func, task_name=None, **kwargs): def execute(self):
args, kwargs = self.data or ((), {}) return func(*args, **kwargs) attrs = { 'execute': execute, 'func_name': func.__name__, '__module__': func.__module__, '__doc__': func.__doc__
}
attrs.update(kwargs) klass = type(
task_name or func.__name__,
(task_class,),
attrs
) return klassclass Hu(object): def __init__(self, func_name=None):
self.func_name = func_name
check_file() def task(self):
def deco(func):
self.func_name = func.__name__
klass = create_task(Hu, func, self.func_name)
func.delay = klass(func_name=klass.func_name).delay @wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs) return wrapper return deco def delay(self, *args, **kwargs):
_args = [self.func_name]
_args.extend(args)
Timer(0, set_delay_task, _args, kwargs).start() return Truedef boss():
while True:
current_tasks = get_delay_tasks() for func, params in current_tasks: # Task accepted: auth.tasks.send_msg
logging.debug('Task accepted: %s' % func)
q.put((func, params))
sleep(1)def worker():
while True:
func, params = q.get() print 'get task: %s\n' % func
method = get_method_by_name(func)
args, kwargs = params # Task auth.tasks.send_msgsucceeded in
start_time = time()
method(*args, **kwargs)
end_time = time()
logging.debug('Task %s succeeded in %s' % (str(func), end_time - start_time))
q.task_done()def main():
check_file()
print('starting at:', time()) for target in (boss, worker):
t = Thread(target=target)
t.start()
print('all DONE at:', time())hu = Hu()# 使用方式如下:@hu.task()def test(num):
sleep(2) print 'test: %s' % numif __name__ == '__main__': for i in range(10):
test.delay(i, count_down=random.randint(1, 10))
main()# output(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2get task: test(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796(2017-03-21 15:59:24,404) Task accepted: test
test: 1get task: test
按照1分钟实现“延迟消息”功能的思路。队列的数据结构为
{
index: {
cycle_num: [(func, bargs)]
}
}
index的值为 1-3600。每小时一个循环。 cycle_num 则是 由 (时间戳 / 3600 + 1) 计算得到的值,是圈数。
每当有任务加入,我们计算出index和cycle_num 将参数和方法名写入json文件。 读取任务时,计算当前 index和cycle_num, 取出需要执行的任务,使用多线程的形式执行。
为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量。
加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。
当然,也可以使用redis 存储队列,因为 redis 是单线程操作,可以防止多线程操作影响数据一致性的问题。 这一部分有需要的可以自己实现。
参考: