英文原版:https://github.com/earl/beanstalkc/blob/wip-doc-rtfd/doc/tutorial.rst
背景介绍:
Beanstalk,一个高性能、轻量级的分布式内存队列系统。而beanstalkc是Beanstalk的一个python客户端库。
启动服务端beanstalkd进程来监听14711端口,可以使用下列命令:
beanstalkd -l 127.0.0.1 -p 14711
除了安装beanstalkc外,一般你还需要装PyYAML。如果坚持不用PyYAML,你同样可以使用beanstalkc。 更多细节可见附录A部分。
我们需要import这个库并和服务端进行连接:
>>> import beanstalkc
>>> beanstalk = beanstalkc.Connection(host='localhost', port=14711)
如果我们不填host或者端口参数,会默认各自使用localhost和11300。同样有一个以s为单位的connect_timeout参数,用于决定socket将等待服务端多长时间来响应连接。如果值为None,那他将不会有timeout;如果不指定参数的话,默认是1s。
连接已经成功,我们往队列里面添加一个job:
>>> beanstalk.put('hey!')
1
或者我们reserve job
>>> job = beanstalk.reserve()
>>> job.body
'hey!'
一旦我们处理完一个job,我们就要把它标志为done。否则job一旦运行时间超过一个“time to run”周期(默认是120s)会重新进入队列。我们可以通过delete将任务标志为done:
>>> job.delete()
reserve后可能永远保持阻塞直到有job处于ready状态。如果job不是desired的,我们可以使用带timeout(以s为单位)的reserve操作,来决定我们将等待多长时间来接收这个job。如果这个reserve的timeout时间到了,它将返回None:
>>> beanstalk.reserve(timeout=0) is None
True
如果我们设置timeout为0,reserve将立即返回一个job或者None。
注意:beanstalkc需要job的body是strings,你需要将你的值转换为string。:
>>> beanstalk.put(42)
Traceback (most recent call last):
...
AssertionError: Job body must be a str instance
对于你放进body里的内容是没有限制,所以你可以使用任意二进制数据。如果你想放入一张图片,你只需要将图片转换成string。如果你想发Unicode的文本,你只需要使用unicode.encode来进行编码成一个string。
一个单独的beanstalkd server可以提供多个不同的队列,我们称之为 "tubes" in beanstalkd。通过这个命令查看所有可用的tubes:
>>> beanstalk.tubes()
['default']
一个beanstalkd客户端可以选择一个需要put job的tube,这是一个已经被客户端使用的tube,我们来查看这个客户端当前使用的tube:
>>> beanstalk.using()
'default'
除非特殊说明,默认是使用default这个tube。如果想使用一个不一样的tube:
>>> beanstalk.use('foo')
'foo'
>>> beanstalk.using()
'foo'
如果你想使用的tube不存在,beanstalkd会默认建一个这样的tube:
>>> beanstalk.tubes()
['default', 'foo']
当然你可以自由切换到默认的tube。当tube没有任何客户端using和watching的情况下会自动消失。
>>> beanstalk.use('default')
'default'
>>> beanstalk.using()
'default'
>>> beanstalk.tubes()
['default']
进一步,一个beanstalkd客户端会从多个tube里面reserve job,那么那些tube会被客户端watched。通过这个命令查看当前客户端watch了哪些tube:
>>> beanstalk.watching()
['default']
这样watch一个新的tube:
>>> beanstalk.watch('bar')
2
>>> beanstalk.watching()
['default', 'bar']
正如前言所说,一旦你开始watch这个原本不存在的tube,它就会自动创建:
>>> beanstalk.tubes()
['default', 'bar']
停止watch这个tube:
>>> beanstalk.ignore('bar')
1
>>> beanstalk.watching()
['default']
至少要watch一个tube,当你试图ignore最后一个tube时是不生效的:
>>> beanstalk.ignore('default')
1
>>> beanstalk.watching()
['default']
总结:每个beanstalkd管理着两个独立的concerns,一个用来put新产生的job,一个用来reserve。因此,它们使用各自独立的函数:
use和using影响往哪里put job
watch和watching控制从哪里reserve job
注意:这两个concerns是完全不相关的,当你use一个tube,是不会自动watch它的。同理当你watching一个tube时,你是不会using这个tube。
beanstalkd提供了很多server级、tube级和job级的统计方法。job的统计细节只能在job的生命周期内可以取到。创造一个新的job:
>>> beanstalk.put('ho?')
2
>>> job = beanstalk.reserve()
我们获取job级的统计:
>>> from pprint import pprint
>>> pprint(job.stats())
{'age': 0,
...
'id': 2,
...
'state': 'reserved',
...
'tube': 'default'}
如果你试图获取一个已经删掉job的状态,你会收到一个CommandFailed的exception:
>>> job.delete()
>>> job.stats()
Traceback (most recent call last):
...
CommandFailed: ('stats-job', 'NOT_FOUND', [])
我们可以查到default这个tube的一些数值指标:
>>> pprint(beanstalk.stats_tube('default'))
{...
'current-jobs-ready': 0,
'current-jobs-reserved': 0,
'current-jobs-urgent': 0,
...
'name': 'default',
...}
最终,我们可以通过connection的stat来获取大量的server级的统计:
>>> pprint(beanstalk.stats())
{...
'current-connections': 1,
'current-jobs-buried': 0,
'current-jobs-delayed': 0,
'current-jobs-ready': 0,
'current-jobs-reserved': 0,
'current-jobs-urgent': 0,
...}
在上面的基础操作中,我们讨论了一个任务典型的生命周期:
put reserve delete
-----> [READY] ---------> [RESERVED] --------> *poof*
(这个插图来自beanstalkd的协议文档。它作为beanstalkd发布版本的一部分,包含在protocol.txt里)
但是除了ready和reserved,一个job同样包含delayed和buried状态。job的状态变化如下:
(这个插图来自beanstalkd的协议文档。它作为beanstalkd发布版本的一部分,包含在protocol.txt里)
现在我们使用一些新的、实际的用法。首先我们创建一个包含delay的job。这样这个job只能在delay过了后才能被reserve到。
>>> beanstalk.put('yes!', delay=1)
3
>>> beanstalk.reserve(timeout=0) is None
True
>>> job = beanstalk.reserve(timeout=1)
>>> job.body
'yes!'
如果我们对一个job不感兴趣(比如我们处理失败后),我们可以简单地将这个job释放回它原来的tube。
>>> job.release()
>>> job.stats()['state']
'ready'
如果我们想忽略一个job,我们可以将它“bury”掉。一个被“bury”掉的job会被搁置,不会被reserve到:
>>> job = beanstalk.reserve()
>>> job.bury()
>>> job.stats()['state']
'buried'
>>> beanstalk.reserve(timeout=0) is None
True
被“bury”掉的任务被维持在一个特殊的FIFO队列,被独立在正常job的处理生命周期,直到它们再次被“kick”激活。
>>> beanstalk.kick()
1
如果你需要一次性kick活多个job,kick的返回值会告诉你最终实际kick活多少个job。
>>> beanstalk.kick(42)
0
除了reserve job,客户端还支持对job进行peek操作。peek允许你查看job的信息而不用变更它们的状态。如果你知道你感兴趣的job的id,你可以直接peek这个任务。我们仍然拿之前的3号job的例子来展示:
>>> job = beanstalk.peek(3)
>>> job.body
'yes!'
注意peek操作并没有reserve这个job:
>>> job.stats()['state']
'ready'
如果你对一个不存在的job进行peek操作,你会简单地看不到东西:
>>> beanstalk.peek(42) is None
True
如果你对一个具体的job不感兴趣,只是想通过状态来看job状况,beanstalkd同样提供了一些这样的命令。通过peek_ready我们可以获取到同样可以被reserve的job(就是下一个ready的job):
>>> job = beanstalk.peek_ready()
>>> job.body
'yes!'
注意:你是不能release、bury那些不是由你reserve的job。这些操作会被忽略:
>>> job.release()
>>> job.bury()
>>> job.stats()['state']
'ready'
但是你可以delete那些不是由你reserve的job:
>>> job.delete()
>>> job.stats()
Traceback (most recent call last):
...
CommandFailed: ('stats-job', 'NOT_FOUND', [])
最终,你同样可以peek到那些处于特殊队列的job,如处于delayed状态的job:
>>> beanstalk.put('o tempores', delay=120)
4
>>> job = beanstalk.peek_delayed()
>>> job.stats()['state']
'delayed'
或者是buried状态下的:
>>> beanstalk.put('o mores!')
5
>>> job = beanstalk.reserve()
>>> job.bury()
>>> job = beanstalk.peek_buried()
>>> job.stats()['state']
'buried'
如果没有job优先级,beanstalkd就像一个FIFO的队列:
>>> _ = beanstalk.put('1')
>>> _ = beanstalk.put('2')
>>> job = beanstalk.reserve() ; print job.body ; job.delete()
1
>>> job = beanstalk.reserve() ; print job.body ; job.delete()
2
在某些需求场景下,我们需要赋予不同的job不同的优先级。关于job优先级,这里有三个需要注意的事项:
1.优先级数字上越低的job拥有越高的优先级
2.beanstalkd的优先级是32位的unsigned integers(范围是0到2**32 -1)
3.beanstalkc默认优先级是2**31(beanstalkc.DEFAULT_PRIORITY)
创建一个带优先级的job,需要使用priority这个关键词参数:
>>> _ = beanstalk.put('foo', priority=42)
>>> _ = beanstalk.put('bar', priority=21)
>>> _ = beanstalk.put('qux', priority=21)
>>> job = beanstalk.reserve() ; print job.body ; job.delete()
bar
>>> job = beanstalk.reserve() ; print job.body ; job.delete()
qux
>>> job = beanstalk.reserve() ; print job.body ; job.delete()
foo
注意:为什么bar和qux在foo之前离开队列,尽管它们是在foo之后进入队列的。
注意:相同优先级的job任然按照FIFO来处理。
>>> beanstalk.close()
因为beanstalkd使用YAML来描述状态信息(像stats()和tubes()的结果),你一般是需要PyYAML这个库。取决于你的需要,你也可以使用libyaml的c拓展。
如果你实在不想使用PyYAML,你仍然可以使用beanstalkc和不经解析的YAML响应。你需要在创建连接时定义parse_yaml=False。
>>> beanstalk = beanstalkc.Connection(host='localhost',
... port=14711,
... parse_yaml=False)
>>> beanstalk.tubes()
'---\n- default\n'
>>> beanstalk.stats_tube('default')
'---\nname: default\ncurrent-jobs-urgent: 0\n...'
>>> beanstalk.close()
当你不使用benstalkd的自省能力(Connection#tubes, Connection#watching, Connection#stats, Connection#stats_tube, and Job#stats)时,这就十分有用。
或者,你可以在函数里面使用YAML进行解析:
>>> beanstalk = beanstalkc.Connection(host='localhost',
... port=14711,
... parse_yaml=lambda x: x.split('\n'))
>>> beanstalk.tubes()
['---', '- default', '']
>>> beanstalk.stats_tube('default')
['---', 'name: default', 'current-jobs-urgent: 0', ...]
>>> beanstalk.close()
当简单的PyYAML不能满足你的需求时,你可以选择这样处理。
附《Beanstalkd中文协议》
https://github.com/kr/beanstalkd/blob/master/doc/protocol.zh-CN.md
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。