使用Bull管理的NodeJS分散作业

bull 一词

这是一个用于处理分布式作业和消息的队列包,可以在NodeJS上运行,基于redis。这是kue的继任者库。

确认环境

    • node: v10.17.0

redis ( for windows ): 3.0.504

使用express-generator创建项目。

在这里,我们将在一个名为play_node_bull的项目中创建。

npx express-generator play_node_bull
cd play_code_bull
npm install

安装Bull

npm install bull

修改app.js

将以下行添加至app.js文件中。

var createError = require('http-errors');
var express = require('express');
var path = require('path');
var cookieParser = require('cookie-parser');
var logger = require('morgan');

var indexRouter = require('./routes/index');
var usersRouter = require('./routes/users');
+ var jobsRouter = require('./routes/jobs');  

var app = express();

// view engine setup
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'jade');

app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));

app.use('/', indexRouter);
app.use('/users', usersRouter);
+ app.use('/jobs', jobsRouter); 

// catch 404 and forward to error handler
app.use(function(req, res, next) {
  next(createError(404));
});

// error handler
app.use(function(err, req, res, next) {
  // set locals, only providing error in development
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') === 'development' ? err : {};

  // render the error page
  res.status(err.status || 500);
  res.render('error');
});

module.exports = app;

增加jobs.js

第一块区域

在routes文件夹中添加jobs.js。
准备路由器和Bull。

const express = require('express');
const router = express.Router();
const Bull = require("bull")

第二个街区

我会准备一个重负处理的桩件。

// 重い処理のスタブ
const heavyJob = async () => {
  return new Promise((resolve)=>{
    setInterval(() => {
      resolve();
     }, 10000);
  })  
}

第三个街区

定义bull对象和处理名。
处理名是将发送和接收队列绑定起来的关键。

const queue = new Bull('bulltest', { redis: { port: 6379, host: '127.0.0.1' } });

const processor = 'processor_name'; // 処理名

第四个区块

在代码中定义要入队的终点。
要将数据对象加入队列,则使用queue.add()函数并指定处理名称。

router.post('/', (req, res, next) => {
  const data = { greetingTime: (new Date()).toString() }     // jobのデータ(object)
  queue.add(processor, data);
  res.send('ok');
});

module.exports = router;

第五个街区

在这里编写处理程序。
您可以使用queue.process()来指定处理名称、并发处理数和处理程序函数。

const concurrency = 2;  // 並行処理数

queue.process(processor, concurrency, async (job, done) => {
  try {
    await heavyJob();
    done();
  } catch (e) {
    done(e);
  }
});

执行

执行应用程序。默认情况下,将在3000号端口上等待。

npm start

调用API

用curl命令调用创建的jobsAPI。

curl -X POST 'http://127.0.0.1:3000/jobs'

工作的选项 de

interface JobOpts {
  priority: number, // オプションの優先度。1(最高の優先度)~MAX_INT(最低の優先度)を指定する。パフォーマンスにわずかな影響を与えるため、必要でない限り使用しない。
  delay: number, // このジョブを処理できるようになるまで待機するミリ秒数。正確に遅延するためには、サーバーとクライアントの時刻を同期する必要がある。
  attempts: number, // ジョブが完了するまでのリトライ回数
  repeat: RepeatOpts, // cron仕様に従ってジョブを繰り返す。
  backoff: number | BackoffOpts, // ジョブが失敗した場合の自動再試行設定。遅延時間を設定するか、{type: 'fixed' or 'exponential')を指定。
  lifo: boolean, // 後入れ先出しにする (default false)
  timeout: number, // タイムアウトエラーでジョブが失敗するまでのミリ秒数
  jobId: number | string, // ジョブIDを上書きする。既に存在するIDを持つジョブを追加しようとしても、追加されない。
  removeOnComplete: boolean | number, // trueの場合、正常に完了したときにジョブを削除します。falseの場合は`completed`セットに保持される。
  removeOnFail: boolean | number, // trueの場合、処理に失敗したときにジョブを削除します。falseの場合は`faild`セットに保持される。
  stackTraceLimit: number, // スタックトレースに記録されるスタックトレース行の量を制限。
}
bannerAds