前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >beanstalkc Tutorial 中文版

beanstalkc Tutorial 中文版

作者头像
皮皮熊
发布2018-06-13 17:22:05
2.6K2
发布2018-06-13 17:22:05
举报
文章被收录于专栏:大数据与实时计算

英文原版:https://github.com/earl/beanstalkc/blob/wip-doc-rtfd/doc/tutorial.rst

背景介绍:

Beanstalk,一个高性能、轻量级的分布式内存队列系统。而beanstalkc是Beanstalk的一个python客户端库。

开始:

启动服务端beanstalkd进程来监听14711端口,可以使用下列命令:

beanstalkd -l 127.0.0.1 -p 14711

除了安装beanstalkc外,一般你还需要装PyYAML。如果坚持不用PyYAML,你同样可以使用beanstalkc。 更多细节可见附录A部分。

我们需要import这个库并和服务端进行连接:

>>> import beanstalkc

>>> beanstalk = beanstalkc.Connection(host='localhost', port=14711)

如果我们不填host或者端口参数,会默认各自使用localhost和11300。同样有一个以s为单位的connect_timeout参数,用于决定socket将等待服务端多长时间来响应连接。如果值为None,那他将不会有timeout;如果不指定参数的话,默认是1s。

基本操作:

连接已经成功,我们往队列里面添加一个job:

>>> beanstalk.put('hey!')

1

或者我们reserve job

>>> job = beanstalk.reserve()

>>> job.body

'hey!'

一旦我们处理完一个job,我们就要把它标志为done。否则job一旦运行时间超过一个“time to run”周期(默认是120s)会重新进入队列。我们可以通过delete将任务标志为done:

>>> job.delete()

reserve后可能永远保持阻塞直到有job处于ready状态。如果job不是desired的,我们可以使用带timeout(以s为单位)的reserve操作,来决定我们将等待多长时间来接收这个job。如果这个reserve的timeout时间到了,它将返回None:

>>> beanstalk.reserve(timeout=0) is None

True

如果我们设置timeout为0,reserve将立即返回一个job或者None。

注意:beanstalkc需要job的body是strings,你需要将你的值转换为string。:

>>> beanstalk.put(42)

Traceback (most recent call last):

...

AssertionError: Job body must be a str instance

对于你放进body里的内容是没有限制,所以你可以使用任意二进制数据。如果你想放入一张图片,你只需要将图片转换成string。如果你想发Unicode的文本,你只需要使用unicode.encode来进行编码成一个string。

Tube的管理:

一个单独的beanstalkd server可以提供多个不同的队列,我们称之为 "tubes" in beanstalkd。通过这个命令查看所有可用的tubes:

>>> beanstalk.tubes()

['default']

一个beanstalkd客户端可以选择一个需要put job的tube,这是一个已经被客户端使用的tube,我们来查看这个客户端当前使用的tube:

>>> beanstalk.using()

'default'

除非特殊说明,默认是使用default这个tube。如果想使用一个不一样的tube:

>>> beanstalk.use('foo')

'foo'

>>> beanstalk.using()

'foo'

如果你想使用的tube不存在,beanstalkd会默认建一个这样的tube:

>>> beanstalk.tubes()

['default', 'foo']

当然你可以自由切换到默认的tube。当tube没有任何客户端using和watching的情况下会自动消失。

>>> beanstalk.use('default')

'default'

>>> beanstalk.using()

'default'

>>> beanstalk.tubes()

['default']

进一步,一个beanstalkd客户端会从多个tube里面reserve job,那么那些tube会被客户端watched。通过这个命令查看当前客户端watch了哪些tube:

>>> beanstalk.watching()

['default']

这样watch一个新的tube:

>>> beanstalk.watch('bar')

2

>>> beanstalk.watching()

['default', 'bar']

正如前言所说,一旦你开始watch这个原本不存在的tube,它就会自动创建:

>>> beanstalk.tubes()

['default', 'bar']

停止watch这个tube:

>>> beanstalk.ignore('bar')

1

>>> beanstalk.watching()

['default']

至少要watch一个tube,当你试图ignore最后一个tube时是不生效的:

>>> beanstalk.ignore('default')

1

>>> beanstalk.watching()

['default']

总结:每个beanstalkd管理着两个独立的concerns,一个用来put新产生的job,一个用来reserve。因此,它们使用各自独立的函数:

use和using影响往哪里put job

watch和watching控制从哪里reserve job

注意:这两个concerns是完全不相关的,当你use一个tube,是不会自动watch它的。同理当你watching一个tube时,你是不会using这个tube。

统计:

beanstalkd提供了很多server级、tube级和job级的统计方法。job的统计细节只能在job的生命周期内可以取到。创造一个新的job:

>>> beanstalk.put('ho?')

2

>>> job = beanstalk.reserve()

我们获取job级的统计:

>>> from pprint import pprint

>>> pprint(job.stats())

{'age': 0,

...

'id': 2,

...

'state': 'reserved',

...

'tube': 'default'}

如果你试图获取一个已经删掉job的状态,你会收到一个CommandFailed的exception:

>>> job.delete()

>>> job.stats()

Traceback (most recent call last):

...

CommandFailed: ('stats-job', 'NOT_FOUND', [])

我们可以查到default这个tube的一些数值指标:

>>> pprint(beanstalk.stats_tube('default'))

{...

'current-jobs-ready': 0,

'current-jobs-reserved': 0,

'current-jobs-urgent': 0,

...

'name': 'default',

...}

最终,我们可以通过connection的stat来获取大量的server级的统计:

>>> pprint(beanstalk.stats())

{...

'current-connections': 1,

'current-jobs-buried': 0,

'current-jobs-delayed': 0,

'current-jobs-ready': 0,

'current-jobs-reserved': 0,

'current-jobs-urgent': 0,

...}

高级操作:

在上面的基础操作中,我们讨论了一个任务典型的生命周期:

put reserve delete

-----> [READY] ---------> [RESERVED] --------> *poof*

(这个插图来自beanstalkd的协议文档。它作为beanstalkd发布版本的一部分,包含在protocol.txt里)

但是除了ready和reserved,一个job同样包含delayed和buried状态。job的状态变化如下:

(这个插图来自beanstalkd的协议文档。它作为beanstalkd发布版本的一部分,包含在protocol.txt里)

现在我们使用一些新的、实际的用法。首先我们创建一个包含delay的job。这样这个job只能在delay过了后才能被reserve到。

>>> beanstalk.put('yes!', delay=1)

3

>>> beanstalk.reserve(timeout=0) is None

True

>>> job = beanstalk.reserve(timeout=1)

>>> job.body

'yes!'

如果我们对一个job不感兴趣(比如我们处理失败后),我们可以简单地将这个job释放回它原来的tube。

>>> job.release()

>>> job.stats()['state']

'ready'

如果我们想忽略一个job,我们可以将它“bury”掉。一个被“bury”掉的job会被搁置,不会被reserve到:

>>> job = beanstalk.reserve()

>>> job.bury()

>>> job.stats()['state']

'buried'

>>> beanstalk.reserve(timeout=0) is None

True

被“bury”掉的任务被维持在一个特殊的FIFO队列,被独立在正常job的处理生命周期,直到它们再次被“kick”激活。

>>> beanstalk.kick()

1

如果你需要一次性kick活多个job,kick的返回值会告诉你最终实际kick活多少个job。

>>> beanstalk.kick(42)

0

检查job

除了reserve job,客户端还支持对job进行peek操作。peek允许你查看job的信息而不用变更它们的状态。如果你知道你感兴趣的job的id,你可以直接peek这个任务。我们仍然拿之前的3号job的例子来展示:

>>> job = beanstalk.peek(3)

>>> job.body

'yes!'

注意peek操作并没有reserve这个job:

>>> job.stats()['state']

'ready'

如果你对一个不存在的job进行peek操作,你会简单地看不到东西:

>>> beanstalk.peek(42) is None

True

如果你对一个具体的job不感兴趣,只是想通过状态来看job状况,beanstalkd同样提供了一些这样的命令。通过peek_ready我们可以获取到同样可以被reserve的job(就是下一个ready的job):

>>> job = beanstalk.peek_ready()

>>> job.body

'yes!'

注意:你是不能release、bury那些不是由你reserve的job。这些操作会被忽略:

>>> job.release()

>>> job.bury()

>>> job.stats()['state']

'ready'

但是你可以delete那些不是由你reserve的job:

>>> job.delete()

>>> job.stats()

Traceback (most recent call last):

...

CommandFailed: ('stats-job', 'NOT_FOUND', [])

最终,你同样可以peek到那些处于特殊队列的job,如处于delayed状态的job:

>>> beanstalk.put('o tempores', delay=120)

4

>>> job = beanstalk.peek_delayed()

>>> job.stats()['state']

'delayed'

或者是buried状态下的:

>>> beanstalk.put('o mores!')

5

>>> job = beanstalk.reserve()

>>> job.bury()

>>> job = beanstalk.peek_buried()

>>> job.stats()['state']

'buried'

job的优先级:

如果没有job优先级,beanstalkd就像一个FIFO的队列:

>>> _ = beanstalk.put('1')

>>> _ = beanstalk.put('2')

>>> job = beanstalk.reserve() ; print job.body ; job.delete()

1

>>> job = beanstalk.reserve() ; print job.body ; job.delete()

2

在某些需求场景下,我们需要赋予不同的job不同的优先级。关于job优先级,这里有三个需要注意的事项:

1.优先级数字上越低的job拥有越高的优先级

2.beanstalkd的优先级是32位的unsigned integers(范围是0到2**32 -1)

3.beanstalkc默认优先级是2**31(beanstalkc.DEFAULT_PRIORITY)

创建一个带优先级的job,需要使用priority这个关键词参数:

>>> _ = beanstalk.put('foo', priority=42)

>>> _ = beanstalk.put('bar', priority=21)

>>> _ = beanstalk.put('qux', priority=21)

>>> job = beanstalk.reserve() ; print job.body ; job.delete()

bar

>>> job = beanstalk.reserve() ; print job.body ; job.delete()

qux

>>> job = beanstalk.reserve() ; print job.body ; job.delete()

foo

注意:为什么bar和qux在foo之前离开队列,尽管它们是在foo之后进入队列的。

注意:相同优先级的job任然按照FIFO来处理。

最后

>>> beanstalk.close()

附录A:beanstalkc 和YAML

因为beanstalkd使用YAML来描述状态信息(像stats()和tubes()的结果),你一般是需要PyYAML这个库。取决于你的需要,你也可以使用libyaml的c拓展。

如果你实在不想使用PyYAML,你仍然可以使用beanstalkc和不经解析的YAML响应。你需要在创建连接时定义parse_yaml=False。

>>> beanstalk = beanstalkc.Connection(host='localhost',

... port=14711,

... parse_yaml=False)

>>> beanstalk.tubes()

'---\n- default\n'

>>> beanstalk.stats_tube('default')

'---\nname: default\ncurrent-jobs-urgent: 0\n...'

>>> beanstalk.close()

当你不使用benstalkd的自省能力(Connection#tubes, Connection#watching, Connection#stats, Connection#stats_tube, and Job#stats)时,这就十分有用。

或者,你可以在函数里面使用YAML进行解析:

>>> beanstalk = beanstalkc.Connection(host='localhost',

... port=14711,

... parse_yaml=lambda x: x.split('\n'))

>>> beanstalk.tubes()

['---', '- default', '']

>>> beanstalk.stats_tube('default')

['---', 'name: default', 'current-jobs-urgent: 0', ...]

>>> beanstalk.close()

当简单的PyYAML不能满足你的需求时,你可以选择这样处理。

附《Beanstalkd中文协议》

https://github.com/kr/beanstalkd/blob/master/doc/protocol.zh-CN.md

本文系外文翻译,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系外文翻译前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 开始:
  • 基本操作:
  • Tube的管理:
  • 统计:
  • 高级操作:
  • 检查job
  • job的优先级:
  • 最后
  • 附录A:beanstalkc 和YAML
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档