使用Bull管理的NodeJS分散作业
bull 一词
这是一个用于处理分布式作业和消息的队列包,可以在NodeJS上运行,基于redis。这是kue的继任者库。
确认环境
-
- node: v10.17.0
redis ( for windows ): 3.0.504
使用express-generator创建项目。
在这里,我们将在一个名为play_node_bull的项目中创建。
npx express-generator play_node_bull
cd play_code_bull
npm install
安装Bull
npm install bull
修改app.js
将以下行添加至app.js文件中。
var createError = require('http-errors');
var express = require('express');
var path = require('path');
var cookieParser = require('cookie-parser');
var logger = require('morgan');
var indexRouter = require('./routes/index');
var usersRouter = require('./routes/users');
+ var jobsRouter = require('./routes/jobs');
var app = express();
// view engine setup
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'jade');
app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));
app.use('/', indexRouter);
app.use('/users', usersRouter);
+ app.use('/jobs', jobsRouter);
// catch 404 and forward to error handler
app.use(function(req, res, next) {
next(createError(404));
});
// error handler
app.use(function(err, req, res, next) {
// set locals, only providing error in development
res.locals.message = err.message;
res.locals.error = req.app.get('env') === 'development' ? err : {};
// render the error page
res.status(err.status || 500);
res.render('error');
});
module.exports = app;
增加jobs.js
第一块区域
在routes文件夹中添加jobs.js。
准备路由器和Bull。
const express = require('express');
const router = express.Router();
const Bull = require("bull")
第二个街区
我会准备一个重负处理的桩件。
// 重い処理のスタブ
const heavyJob = async () => {
return new Promise((resolve)=>{
setInterval(() => {
resolve();
}, 10000);
})
}
第三个街区
定义bull对象和处理名。
处理名是将发送和接收队列绑定起来的关键。
const queue = new Bull('bulltest', { redis: { port: 6379, host: '127.0.0.1' } });
const processor = 'processor_name'; // 処理名
第四个区块
在代码中定义要入队的终点。
要将数据对象加入队列,则使用queue.add()函数并指定处理名称。
router.post('/', (req, res, next) => {
const data = { greetingTime: (new Date()).toString() } // jobのデータ(object)
queue.add(processor, data);
res.send('ok');
});
module.exports = router;
第五个街区
在这里编写处理程序。
您可以使用queue.process()来指定处理名称、并发处理数和处理程序函数。
const concurrency = 2; // 並行処理数
queue.process(processor, concurrency, async (job, done) => {
try {
await heavyJob();
done();
} catch (e) {
done(e);
}
});
执行
执行应用程序。默认情况下,将在3000号端口上等待。
npm start
调用API
用curl命令调用创建的jobsAPI。
curl -X POST 'http://127.0.0.1:3000/jobs'
工作的选项 de
interface JobOpts {
priority: number, // オプションの優先度。1(最高の優先度)~MAX_INT(最低の優先度)を指定する。パフォーマンスにわずかな影響を与えるため、必要でない限り使用しない。
delay: number, // このジョブを処理できるようになるまで待機するミリ秒数。正確に遅延するためには、サーバーとクライアントの時刻を同期する必要がある。
attempts: number, // ジョブが完了するまでのリトライ回数
repeat: RepeatOpts, // cron仕様に従ってジョブを繰り返す。
backoff: number | BackoffOpts, // ジョブが失敗した場合の自動再試行設定。遅延時間を設定するか、{type: 'fixed' or 'exponential')を指定。
lifo: boolean, // 後入れ先出しにする (default false)
timeout: number, // タイムアウトエラーでジョブが失敗するまでのミリ秒数
jobId: number | string, // ジョブIDを上書きする。既に存在するIDを持つジョブを追加しようとしても、追加されない。
removeOnComplete: boolean | number, // trueの場合、正常に完了したときにジョブを削除します。falseの場合は`completed`セットに保持される。
removeOnFail: boolean | number, // trueの場合、処理に失敗したときにジョブを削除します。falseの場合は`faild`セットに保持される。
stackTraceLimit: number, // スタックトレースに記録されるスタックトレース行の量を制限。
}