我用芹菜和姜果芹菜。我已经定义了一个我想要测试的周期性任务。是否可以从shell手动运行定期任务,以便查看控制台输出?
发布于 2012-10-16 00:40:29
您是否尝试过仅从Django shell运行任务?您可以使用任务的.apply
方法来确保该任务在本地快速运行。
假设任务在Django应用程序myapp
中的tasks
子模块中称为my_task
:
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
结果实例与通常的AsyncResult
类型具有相同的接口,只是结果总是在本地急切地求值,并且.apply()
方法将阻塞,直到任务运行完成。
发布于 2018-10-20 12:49:15
如果您的意思是在条件不满足时触发任务,例如周期时间不满足。你可以通过两个步骤来完成。
1.获取您的任务id。
您可以通过键入命令来完成此操作。
celery inspect registered
您将看到类似于app.tasks.update_something
的内容。如果没有,很可能是celery
没有启动。只需运行它。
使用 celery call
运行任务2.
celery call app.tasks.update_something
要了解更多详细信息,只需键入
celery --help
celery inspect --help
celery call --help
发布于 2012-10-16 00:42:42
我认为您需要打开两个shell :一个用于执行Python/Django shell中的任务,另一个用于运行celery worker
(python manage.py celery worker
)。正如前面的回答所说,您可以使用apply()
或apply_async()
运行任务
我已经对答案进行了编辑,所以您不会使用弃用的命令。
https://stackoverflow.com/questions/12900023
复制相似问题