machineryについて
首先
这篇文章是Go3 Advent Calendar 2017的第12篇文章。
这是我第一次参加Advent Calendar。请多关照。
当你使用Go开发Web应用程序时,当接收到某个请求时,你希望返回一个200 OK的响应,并在后台进行处理。我之前进行了一些调查,发现了一个名为machinery的开源软件,我想向你介绍一下它。
机器是什么
详细内容已经在GitHub上进行了描述,以下是其中的一部分摘录。
Machinery是基于分布式消息传递的异步任务队列/作业队列。
根据所写的内容,将消息传递到后台是通过分散消息(broker)进行的。当前支持的方式如下所示。
-
- AMQP
- Redis
また、バックグラウンド処理のステータス並びに結果を格納するものとしては以下をサポートしているようです。
-
- Redis
-
- Memcache
-
- AMQP (書いてありますが、AMQPは非推奨のようです Keeping Results)
- MongoDB
尝试一下
这个教程我已经在自己的环境中测试过了。
下面有示例代码,我们将使用它来进行操作验证。
machinery.go
当然,作为预备工作,需要提前准备好消息代理和结果存储的位置。(本次使用AMQP和MongoDB)
关于这部分内容,本次省略了。
更改设定
因为machinery.go和同一目录下存在名为config.yml的配置文件,所以需要进行修正。
---
broker: 'amqp://guest:guest@localhost:5672/'
default_queue: machinery_tasks
result_backend: 'redis://127.0.0.1:6379'
results_expire_in: 3600000
amqp:
binding_key: machinery_task
exchange: machinery_exchange
exchange_type: direct
prefetch_count: 3
工人开始工作
まず、Workerを起動します。
以下のコマンドによりWorkerを起動することが出来ます。
go run example/machinery.go -c config.yml worker
一旦启动
$ go run machinery.go -c config.yml worker
INFO: 2017/12/05 17:30:51 file.go:30 Successfully loaded config from file for the first time
INFO: 2017/12/05 17:30:51 worker.go:31 Launching a worker with the following settings:
INFO: 2017/12/05 17:30:51 worker.go:32 - Broker: amqp://guest:guest@localhost:5672/
INFO: 2017/12/05 17:30:51 worker.go:33 - DefaultQueue: machinery_tasks
INFO: 2017/12/05 17:30:51 worker.go:34 - ResultBackend: mongodb://127.0.0.1:27017/taskresults
INFO: 2017/12/05 17:30:51 worker.go:36 - AMQP: machinery_exchange
INFO: 2017/12/05 17:30:51 worker.go:37 - Exchange: machinery_exchange
INFO: 2017/12/05 17:30:51 worker.go:38 - ExchangeType: direct
INFO: 2017/12/05 17:30:51 worker.go:39 - BindingKey: machinery_task
INFO: 2017/12/05 17:30:51 worker.go:40 - PrefetchCount: 3
INFO: 2017/12/05 17:30:52 amqp.go:72 [*] Waiting for messages. To exit press CTRL+C
你可以确认以这种方式启动了。
发送执行
接下来,我们将发送任务。
另外打开一个控制台,并执行以下命令。
$ go run machinery.go -c config.yml send
INFO: 2017/12/05 17:40:07 file.go:30 Successfully loaded config from file for the first time
INFO: 2017/12/05 17:40:07 machinery.go:183 Single task:
INFO: 2017/12/05 17:40:07 mongodb.go:315 state index already exist, skipping create step
INFO: 2017/12/05 17:40:08 mongodb.go:315 lock index already exist, skipping create step
INFO: 2017/12/05 17:40:08 machinery.go:194 1 + 1 = 2
INFO: 2017/12/05 17:40:08 machinery.go:202 Group of tasks (parallel execution):
INFO: 2017/12/05 17:40:08 machinery.go:215 1 + 1 = 2
INFO: 2017/12/05 17:40:08 machinery.go:215 2 + 2 = 4
INFO: 2017/12/05 17:40:08 machinery.go:215 5 + 6 = 11
INFO: 2017/12/05 17:40:08 machinery.go:225 Group of tasks with a callback (chord):
INFO: 2017/12/05 17:40:08 machinery.go:238 (1 + 1) * (2 + 2) * (5 + 6) = 88
INFO: 2017/12/05 17:40:08 machinery.go:242 Chain of tasks:
INFO: 2017/12/05 17:40:08 machinery.go:254 (((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = 68
INFO: 2017/12/05 17:40:08 machinery.go:267 Task panicked and returned error = oops
这里显示了一些似乎在移动的日志。
确认结果
我要查看MongoDB中是否有执行结果。
> db.tasks.find()
{ "_id" : "task_6fc5c094-15e3-40f2-9041-66e515fd5a8b", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_f6f2e9ca-160c-4eae-80b7-5a1b1ee49661", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_5a039267-67f8-456d-ba70-671fd54283c9", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(4) } ] }
{ "_id" : "task_73f6979d-40a2-4502-b653-24848214b64d", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(11) } ] }
{ "_id" : "task_b16b18b7-7664-4243-8066-7c46af1a0cb8", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_392b62b0-d6d4-4d57-8343-42b9f59b0e90", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(4) } ] }
{ "_id" : "task_568e0abf-ac15-4aa3-a324-0e7ffae9a347", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(11) } ] }
{ "_id" : "chord_c5908175-f86e-4201-8980-f425a9bd4229", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(88) } ] }
{ "_id" : "task_295b91c9-186a-4be9-89bd-9eed6186bb6b", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_d82fa155-3f73-46c6-bd27-dab6b3c870cd", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(6) } ] }
{ "_id" : "task_26718081-43d4-43f2-ab42-2f3dae4f2174", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(17) } ] }
{ "_id" : "task_15c0b946-8027-41c8-9cc0-b76f00f46f0d", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(68) } ] }
{ "_id" : "task_e2e53f8a-ec85-463c-bb63-717cd14d84ec", "state" : "FAILURE", "error" : "oops" }
确实,处理结果已经被注册了呢!
(顺便说一下,最后一个任务明确地会出现错误。)
处理内容
我本打算简单看一下示例代码。
虽然我原计划这样做,但是由于时间不够,希望能够整理到另一篇文章中。
总结
我在这篇文章中介绍了一个名为machinery的开源软件,它是一个异步队列/执行平台。如果您有兴趣,请一定尝试使用。