如何使用Node.js和BullMQ处理异步任务

引言

网络应用程序具有请求/响应循环。当您访问一个URL时,浏览器会向运行应用程序的服务器发送一个请求,该服务器会处理数据或在数据库中运行查询。在此过程中,用户会一直等待,直到应用程序返回响应。对于某些任务,用户可以很快地获得响应;对于耗时的任务,比如处理图像、分析数据、生成报告或发送电子邮件,这些任务需要很长时间才能完成,并且会减慢请求/响应循环的速度。例如,假设您有一个允许用户上传图像的应用程序。在这种情况下,您可能需要调整大小、压缩或将图像转换为其他格式,以节省服务器的磁盘空间,然后再将图像显示给用户。处理图像是一个CPU密集型的任务,这可能会阻塞Node.js线程,直到任务完成。这可能需要几秒钟或几分钟的时间。用户必须等待任务完成后,才能从服务器获得响应。

为了避免减慢请求/响应循环,您可以使用bullmq,一个分布式任务(作业)队列,允许您将耗时任务从Node.js应用程序转移到bullmq,从而释放请求/响应循环。这个工具使得您的应用程序能够快速向用户发送响应,同时bullmq在后台异步执行任务,与您的应用程序独立运行。为了跟踪作业,bullmq使用Redis在队列中存储每个作业的简要描述。然后,bullmq工作程序从队列中取消排队并执行每个作业,并在完成后标记为完成。

在这篇文章中,你将使用 bullmq 将一个耗时任务转移到后台,从而使应用能够快速响应用户。首先,你将创建一个没有使用 bullmq 的耗时任务的应用。然后,你将使用 bullmq 异步执行该任务。最后,你将安装一个可视化仪表板来管理 Redis 队列中的 bullmq 作业。

前提条件

要按照这个教程进行操作,你需要准备以下材料:

  • Node.js development environment set up. For Ubuntu 22.04, follow our tutorial on How To Install Node.js on Ubuntu 22.04. For other systems, see How to Install Node.js and Create a Local Development Environment.
  • Redis installed on your system. On Ubuntu 22, follow Steps 1 through 3 in our tutorial on How To Install and Secure Redis on Ubuntu 22.04. For other systems, see our tutorial on How To Install and Secure Redis.
  • Familiarity with promises and async/await functions, which you can develop in our tutorial Understanding the Event Loop, Callbacks, Promises, and Async/Await in JavaScript.
  • Basic knowledge of how to use Express. See our tutorial on How To Get Started with Node.js and Express.
  • Familiarity with Embedded JavaScript (EJS). Check out our tutorial on How To Use EJS to Template Your Node Application for more details.
  • Basic understanding of how to process images with sharp, which you can learn in our tutorial on How To Process Images in Node.js with Sharp.

第一步 – 设置项目目录

在这个步骤中,您将创建一个目录并安装应用程序所需的依赖项。本教程中您要构建的应用程序将允许用户上传图像,然后使用sharp包进行处理。图像处理需要大量时间,并且可能会减慢请求/响应周期,因此此任务非常适合使用bullmq在后台进行处理。您将使用的这种任务分配技术也适用于其他耗时的任务。

首先,创建一个名为image_processor的目录,并进入该目录。

  1. mkdir image_processor && cd image_processor

然后,将该目录初始化为一个npm包。

  1. npm init -y

这个命令会创建一个 package.json 文件。-y 选项告诉 npm 接受所有默认设置。

运行该命令后,您的输出将与以下内容匹配:

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

输出确认package.json文件已经创建。重要属性包括您的应用程序名称(name),应用程序版本号(version)和项目的起始点(main)。如果您想了解更多关于其他属性的信息,您可以查阅npm的package.json文档。

您在本教程中将构建的应用程序需要以下依赖项:

  • express: a web framework for building web apps.
  • express-fileupload: a middleware that allows your forms to upload files.
  • sharp: an image processing library.
  • ejs: a template language that allows you to generate HTML markup with Node.js.
  • bullmq: a distributed task queue.
  • bull-board: a dashboard that builds upon bullmq and displays the status of the jobs with a nice User Interface(UI).

为了安装所有这些依赖项,请运行以下命令:

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

除了您安装的依赖项之外,在本教程的后面,您还将使用以下图像。

An image underwater with a ray of light coming into it

使用curl命令将图像下载到本地计算机上您选择的位置。

  1. curl -O https://deved-images.nyc3.cdn.digitaloceanspaces.com/CART-68886/underwater.png

你已经具备了构建一个不包含bullmq的Node.js应用程序所需的全部依赖,接下来你将完成这个任务。

第二步 – 在不使用bullmq的情况下实施耗时任务。

在这一步中,您将使用Express构建一个应用程序,允许用户上传图片。应用程序将使用sharp启动一个耗时任务,将图像调整为多种尺寸,在发送响应后向用户显示这些尺寸。这一步将帮助您了解耗时的任务如何影响请求/响应循环。

请使用Nano或您偏好的文本编辑器创建index.js文件。

  1. nano index.js

在你的index.js文件中,加入以下代码以引入依赖项。

图像处理器/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

在第一行中,您使用Node导入路径模块以计算文件路径。在第二行中,您导入fs模块以与目录进行交互。然后,您导入express web框架。您导入body-parser模块以添加中间件以解析HTTP请求中的数据。接下来,您导入sharp模块进行图像处理。最后,您导入express-fileupload以处理来自HTML表单的上传。

接下来,将以下代码添加到您的应用程序中以实现中间件功能。

图像处理器/index.js
...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

首先,你将app变量设置为Express的一个实例。其次,使用app变量,使用set()方法将Express配置为使用ejs模板语言。然后,你使用use()方法添加body-parser模块中间件,将HTTP请求中的JSON数据转换为可以用JavaScript访问的变量。在下一行中,你还使用相同的方式处理URL编码的输入数据。

接下来,添加以下代码行来增加中间件以处理文件上传和提供静态文件服务。

图像处理器/索引.js
...
app.use(fileUpload());
app.use(express.static("public"));

你可以通过调用fileUpload()方法来添加中间件以解析上传的文件,并设置一个目录,Express将在该目录中查找和提供静态文件(例如图片和CSS)。

使用中间件设置,创建一个路由来显示一个HTML表单,用于上传图片。

图像处理器/index.js
...
app.get("/", function (req, res) {
  res.render("form");
});

在这里,您使用Express模块的get()方法来指定/路由和当用户访问主页或/路由时应运行的回调函数。在回调函数中,您调用res.render()来渲染位于views目录中的form.ejs文件。您尚未创建form.ejs文件或views目录。

要创建它,首先保存并关闭您的文件。在终端中,输入以下命令来创建您的项目根目录中的视图目录。

  1. mkdir views

移动到视图目录中。

  1. cd views

在您的编辑器中创建form.ejs文件。

  1. nano form.ejs

在你的form.ejs文件中,添加以下代码来创建表单:

图片处理器的视图表单.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

首先,您需要引用head.ejs文件,而您尚未创建该文件。head.ejs文件将包含HTML的头部元素,您可以在其他HTML页面中引用它。

在body标签中,您使用以下属性创建一个表单:

  • action specifies the route where the form data should be sent when the form is submitted.
  • method specifies the HTTP method for sending data. The POST method embeds the data in an HTTP request.
  • encytype specifies how the form data should be encoded. The value multipart/form-data enables the HTML input elements to upload file data.

在表单元素中,您创建一个输入标签以上传文件。然后,您使用type属性设置为submit来定义按钮元素,该属性允许您提交表单。

完成后,保存并关闭你的文件。

接下来,创建一个 head.ejs 文件。

  1. nano head.ejs

在你的 head.ejs 文件中,添加以下代码来创建应用程序的 head 部分。

图像处理器/视图/头部.ejs
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

在这里,你引用了 main.css 文件,该文件将在接下来的步骤中在 public 目录中创建。那个文件将包含该应用程序的样式。现在,你将继续设置静态资源的流程。

保存并关闭文件。

为了处理来自表单提交的数据,你必须在 Express 中定义一个 post 方法。为了做到这一点,返回到你的项目的根目录。

  1. cd ..

再次打开你的 index.js 文件。

  1. nano index.js

在你的index.js文件中,添加下面的代码行来定义一个处理表单提交的方法,该方法在路由/upload上使用:

图像处理器/index.js
app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

您使用app变量来调用post()方法,该方法将处理在/upload路由上提交的表单。接下来,您从HTTP请求中提取上传的图像数据到image变量中。然后,如果用户未上传图像,您设置一个响应来返回400状态码。

为上传的图像设置过程,请添加以下突出显示的代码。

图片处理器/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

这些行代表你的应用程序如何处理图像。首先,从上传的图像中删除图像扩展名,并将名称保存在imageName变量中。接下来,定义processImage()函数。此函数接受size参数,其值将用于确定调整大小期间的图像尺寸。在函数中,您使用包含上传图像的二进制数据的缓冲区image.data来调用sharp()。sharp根据size参数中的值对图像进行调整大小。使用sharp的webp()方法将图像转换为webp图像格式。然后,将图像保存在public/images/目录中。

下面的数字列表定义了用来调整上传图像大小的尺寸。然后使用JavaScript的map()方法来针对sizes数组中的每个元素调用processImage()函数,并返回一个新数组。每次map()方法调用processImage()函数时,都会返回一个新数组的Promise。你使用Promise.all()方法来解析它们。

计算机处理速度各不相同,用户可以上传的图像大小也不同,这可能会影响图像处理速度。为了进行演示目的而延迟此代码,插入以下突出显示的行来添加一个占用CPU资源的增量循环,并重定向到一个页面,该页面将显示调整大小后的图像。

图像处理器/index.js
...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

循环将运行一百亿次以递增计数器变量。你调用res.redirect()函数将应用程序重定向到/result路由。该路由将渲染一个HTML页面,显示public/images目录中的图像。

“目前尚不存在/result路由。要创建该路由,请在你的index.js文件中添加高亮代码。”

图片处理器/index.js
...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

你可以使用app.get()方法来定义/result路由。在这个函数中,你可以使用完整路径来定义imgDirPath变量,指向public/images目录。你可以使用fs模块的readdirSync()方法来读取给定目录中的所有文件。之后,你可以通过链式调用map()方法将图片路径加上前缀images/,并返回一个新的数组。

最后,你调用res.render()方法来渲染result.ejs文件,该文件目前还不存在。你将包含所有图片相对路径的imgFiles变量传递给result.ejs文件。

保存并关闭你的文件。

要创建result.ejs文件,请返回到views目录。

  1. cd views

在您的编辑器中创建并打开result.ejs文件。

  1. nano result.ejs

在你的result.ejs文件中,添加以下代码来显示图片:

图像处理器/视图/结果.ejs
<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

首先,你引用了head.ejs文件。在body标签中,你检查imgFiles变量是否为空。如果它有数据,你会遍历每个文件,并为每个数组元素创建一张图片。如果imgFiles为空,你会打印一条消息告诉用户等待几秒钟后刷新页面来查看调整大小后的图片。

保存并关闭你的文件。

接下来,返回根目录并创建包含静态资产的公共目录。

  1. cd .. && mkdir public

进入公共目录。

  1. cd public

创建一个图像目录,用于存储上传的图片。

  1. mkdir images

接下来,创建css目录并转到该目录下。

  1. mkdir css && cd css

在你的编辑器中,创建并打开main.css文件,这个文件是你之前在head.ejs文件中引用的。

  1. nano main.css

在你的main.css文件中,添加以下样式:

图像处理程序/公共/样式/主要.css
body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Styles for the "Choose File"  button **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Styles for the "Upload Image"  button **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

这些代码将为应用程序中的元素添加样式。使用HTML属性,您可以为“选择文件”按钮的背景设置十六进制码#2196f3(一种蓝色色调),并为“上传图像”按钮的边框设置为橙色。您还可以为/result页面中的元素添加样式,使它们更具可视性。

完成后,请保存并关闭文件。

返回到项目的根目录。

  1. cd ../..

在你的编辑器中打开index.js文件。

  1. nano index.js

在你的index.js文件中添加以下代码,它将启动服务器。

图片处理器/index.js
...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

现在完整的 index.js 文件将匹配以下内容:

图像处理器/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

完成修改后,保存并关闭文件。

使用node命令运行应用程序。

  1. node index.js

你会收到这样的输出。

Output
Server running on port 3000

这个输出证实服务器没有任何问题。

打开您偏爱的浏览器,并访问http://localhost:3000/。

Note

注意:如果你正使用远程服务器进行教程,你可以通过端口转发在本地浏览器中访问应用程序。
当Node.js服务器正在运行时,打开另一个终端并输入以下命令:
ssh -L 3000:localhost:3000 你的非root用户@你的服务器IP

一旦你连接到服务器,运行node index.js然后在本地机器的网页浏览器中导航到http://localhost:3000/。

当页面加载时,它将与以下内容进行匹配:

Screencapture of the application homepage with a title reading

接下来,点击选择文件按钮,然后在您的本地设备上选择underwater.png图像。显示将从未选择文件变为underwater.png。之后,点击上传图片按钮。应用程序将会加载一段时间,它会处理图像并运行增加循环。

一旦任务完成,/result路由将加载包含调整大小的图片。

Screencapture of the page with multiple resized images

你可以使用 CTRL+C 停止服务器。Node.js 在文件更改时不会自动重新加载服务器,所以当你更新文件时,你需要停止并重新启动服务器。

您现在了解到一个耗时任务如何影响应用程序的请求/响应周期。您将以异步方式执行该任务。

第三步 – 使用bullmq异步执行耗时任务

在这一步中,你将使用bullmq将一个耗时的任务转移到后台。这个调整将释放请求/响应周期,使你的应用在图像处理过程中可以立即响应用户。

要做到这一点,你需要用bullmq创建工作的简洁描述,并将其添加到队列中。队列是一种类似于现实生活中排队方式的数据结构。当人们排队进入一个空间时,排在最前面的人将是第一个进入空间的人。之后来的人将排在队伍的末尾,并在所有在他们前面的人进入空间后进入。使用队列数据结构的先入先出(FIFO)过程,最早添加到队列中的项目将是最早被移除(出队)的项目。使用bullmq,生产者将一个工作添加到队列中,消费者(或工作者)将从队列中取出一个工作并执行它。

在bullmq中,队列是在Redis中的。当您描述一个作业并将其添加到队列中时,在Redis队列中创建了一个作业条目。作业描述可以是一个字符串,也可以是一个带有包含最小数据或引用数据的属性的对象,这些数据将允许bullmq稍后执行该作业。一旦您定义了将作业添加到队列中的功能,您将耗时的代码转移到一个单独的函数中。稍后,当作业从队列中出列时,bullmq将使用您存储在队列中的数据调用此函数。一旦任务完成,bullmq将标记其为已完成,从队列中取出另一个作业并执行它。

在你的编辑器中打开 index.js 文件。

  1. nano index.js

在你的index.js文件中,添加以下突出显示的代码行,使用Bullmq来创建一个Redis队列。

图像处理器/index.js
...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

你首先从bullmq中提取Queue类,该类用于在Redis中创建队列。然后,你将redisOptions变量设置为一个包含属性的对象,Queue类实例将使用这些属性来与Redis建立连接。你将host属性值设置为localhost,因为Redis正在你的本地机器上运行。

Note

注意:如果Redis运行在与你的应用程序分开的远程服务器上,你需要将host属性的值更新为远程服务器的IP地址。你还需要将port属性的值设置为6379,这是Redis用于监听连接的默认端口。
如果你已经设置了端口转发到运行Redis和应用程序的远程服务器上,你不需要更新host属性,但是每次登录服务器运行应用程序时,你需要使用端口转发连接。

接下来,您将imageJobQueue变量设置为Queue类的一个实例,其第一个参数为队列的名称,第二个参数为一个对象。该对象具有一个连接属性,其值设置为redisOptions变量中的一个对象。在实例化Queue类之后,将在Redis中创建一个名为imageJobQueue的队列。

最后,您定义了addJob()函数,用于在imageJobQueue中添加一个工作。该函数接受一个包含工作信息的job参数(您将使用addJob()函数来保存在队列中的数据)。在函数中,您调用imageJobQueue的add()方法,将工作名称作为第一个参数,工作数据作为第二个参数。

将以下代码添加到调用addJob()函数以在队列中添加一个作业的代码中:

图片处理器/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

在这里,你使用描述工作的对象调用addJob()函数。对象具有type属性,其值为工作的名称。第二个属性image设置为一个包含用户上传的图像数据的对象。由于image.data中的图像数据是一个缓冲区(二进制形式),所以你调用JavaScript的toString()方法将其转换为可以存储在Redis中的字符串,并将其作为结果设置为data属性。image属性设置为已上传图像的名称(包括图像扩展名)。

你现在已经定义了 bullmq 执行后续工作所需的信息。根据你的工作需求,你可以添加更多或更少的工作信息。

Warning

警告:由于Redis是一个内存数据库,避免将大量的作业数据存储在队列中。如果有一个需要处理的大文件,将文件保存在磁盘或云上,然后将文件的链接作为字符串保存在队列中。当bullmq执行作业时,它将从Redis中保存的链接中获取文件。

保存并关闭您的文件。

接下来,创建并打开一个名为utils.js的文件,其中包含图像处理代码。

  1. nano utils.js

在你的 utils.js 文件中,添加以下代码来定义处理图像的函数:

图像处理工具/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

在前两行中,您导入了处理图像和计算路径所需的模块。然后,您定义了processUploadedImages()函数,该函数将包含耗时的图像处理任务。该函数接受一个作业参数,当工作程序从队列获取作业数据并使用队列数据调用processUploadedImages()函数时,该参数将被填充。您还导出processUploadedImages()函数,以便在其他文件中可以引用它。

保存并关闭文件。

返回到index.js文件:

  1. nano index.js

复制index.js文件中的高亮行,然后从该文件中删除它们。你一会儿会需要复制的代码,所以把它保存到剪贴板上。如果你正在使用nano,请用鼠标右键点击并复制这些行。

图像处理器/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

上传路由的post方法现在将与以下匹配:

图像处理器/index.js
...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

保存并关闭此文件,然后打开utils.js文件。

  1. nano utils.js

在您的utils.js文件中,将您刚刚复制的/upload路由回调中的行粘贴到processUploadedImages函数中。

图像处理/工具.js
...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

既然你已经将处理图像的代码移动了,你需要更新它,以便使用你之前定义的processUploadedImages()函数的工作参数中的图像数据。

要做到这一点,您需要在下面添加和更新突出显示的行。

图像处理器/utils.js

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

你使用Buffer.from()方法将图像数据的字符串版本转换回二进制。然后,你将path.parse()更新为对保存在队列中的图像名称的引用。接下来,你更新sharp()方法,将图像二进制数据存储在imageFileData变量中。

现在完整的utils.js文件将匹配以下内容:

图像处理程序/utils.js
const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

保存并关闭文件,然后返回index.js。

  1. nano index.js

由于图像现在在utils.js文件中处理,因此不再需要对应的锐化变量。请从文件中删除高亮显示的行。

图像处理器/index.js
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

保存并关闭您的文件。

你现在已经定义了在Redis中创建一个队列和添加一个作业的功能。你还定义了processUploadedImages()函数来处理上传的图片。

剩下的任务是创建一个消费者(或工作者),从队列中取出作业,并使用作业数据调用 processUploadedImages() 函数。

在你的编辑器中创建一个 worker.js 文件。

  1. nano worker.js

请在您的worker.js文件中添加以下代码。

图像处理/工人.js
const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

在第一行中,你从bullmq中导入了Worker类;一旦实例化,它将启动一个从Redis队列中出列作业并执行的工作进程。接下来,你从utils.js文件中引用processUploadedImages()函数,以便工作进程可以使用队列中的数据调用该函数。

你定义了一个 workerHandler() 函数,该函数接受一个包含队列中作业数据的 job 参数。在函数中,你记录作业开始的日志,然后使用作业数据调用 processUploadedImages()。之后,你记录一个成功的消息并返回 null。

为了让工作人员能够连接到Redis,从队列中取出一个作业,并使用作业数据调用workerHandler()函数,向文件中添加以下几行内容。

图像处理器/工作人员.js
...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

在这里,你将workerOptions变量设置为一个包含Redis连接设置的对象。你将worker变量设置为Worker类的一个实例,该实例接受以下参数:

  • imageJobQueue: the name of the job queue.
  • workerHandler: the function that will run after a job has been dequeued from the Redis queue.
  • workerOptions: the Redis config settings that the worker uses to establish a connection with Redis.

最后,您记录了一个成功的消息。

添加完这些行后,保存并关闭你的文件。

你现在已经定义了 bullmq worker 的功能,从队列中取消队列并执行它们。

在您的终端中,删除public/images目录中的图像,以便您可以为测试您的应用程序重新开始。

  1. rm public/images/*

接下来,运行index.js文件。

  1. node index.js

该应用将开始启动。

Output
Server running on port 3000

你现在要启动工人。打开一个第二个终端会话并直接进入项目。

  1. cd image_processor/

用以下命令启动工作人员:

  1. node worker.js

工人将开始工作

Output
Worker started!

在您的浏览器中访问http://localhost:3000/。单击“选择文件”按钮,从您的计算机选择underwater.png,然后点击“上传图片”按钮。

在几秒钟后,您可能会收到一个立即回复,告诉您刷新页面。

Screencapture of a page with a message that reads

另外,您可能会在页面上立即收到一些已经处理完毕的图像的即时响应,而另一些图像仍在进行处理。

Screencapture of a page with some of the images and a message that reads

你可以多刷新几次页面,以加载所有调整大小的图片。

回到你的工作人员正在运行的终端。该终端上会有一条与以下内容匹配的消息。

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

输出结果证实 bullmq 成功运行了任务。

即使工作人员没有运行,您的应用程序仍然可以完成耗时任务的卸载。为了证明这一点,在第二个终端使用CTRL+C停止工作人员。

在您的初始终端会话中,停止Express服务器并删除public/images中的图像。

  1. rm public/images/*

在此之后,重新启动服务器。

  1. node index.js

在您的浏览器中,访问http://localhost:3000/并再次上传underwater.png图像。当您被重定向到/result路径时,页面上的图像将不会显示,因为工作程序没有运行。

Screencapture of the results page with a message that reads

返回到您运行worker的终端,然后重新启动worker。

  1. node worker.js

输出结果将与以下内容相匹配,以便让您知道作业已经开始。

Output
Worker started! Starting job: processUploadedImages

当工作完成并且输出中包含一行显示”完成的工作:processUploadedImages”时,请刷新浏览器。现在图片将会被加载。

Screencapture of a page with the images and a message that reads

停止服务器和工人。

您现在可以使用bullmq将耗时的任务转移到后台,并以异步方式执行。接下来,您将设置一个仪表板来监控队列的状态。

第四步——添加一个仪表盘来监控 bullmq 队列

在这一步中,您将使用bull-board软件包来通过可视化面板监控Redis队列中的作业。此软件包将自动创建一个用户界面(UI)仪表板,显示和组织关于存储在Redis队列中的bullmq作业的信息。通过浏览器,您可以监视已完成、正在等待或失败的作业,而无需打开终端中的Redis CLI。

在你的文本编辑器中打开index.js文件。

  1. nano index.js

将下面的代码添加到导入 bull-board 文件中:

图像处理器/index.js
...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

在上面的代码中,你从bull-board导入了createBullBoard()方法。你还导入了BullMQAdapter,使得bull-board可以访问bullmq队列,以及ExpressAdapter,用于提供Express显示仪表板的功能。

接下来,将高亮代码添加到鲍尔委办局与鲍尔信号队列之间的连接上。

图像处理器/index.js
...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

首先,您将serverAdapter设置为ExpressAdapter的一个实例。接下来,您调用createBullBoard()来使用bullmq队列数据初始化仪表板。您将一个包含queues和serverAdapter属性的对象参数传递给该函数。第一个属性queues接受您使用bullmq定义的队列的数组,这里是imageJobQueue。第二个属性serverAdapter包含一个接受Express服务器适配器的实例的对象。之后,您可以使用setBasePath()方法将/admin路径设置为访问仪表板的路径。

接下来,在/admin路由中添加serverAdapter中间件。

图像处理器/index.js
app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

完整的index.js文件将与以下内容匹配:

图像处理器/index.js
const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

在你完成修改之后,保存并关闭你的文件。

运行index.js文件。

  1. node index.js

返回你的浏览器,访问http://localhost:3000/admin。仪表盘将会加载:

Screencapture of the Bull Dashboard with a queue labeled

在仪表板中,您可以查看工作类型、所消耗的数据以及有关工作的更多信息。您还可以切换到其他选项卡,例如已完成选项卡以获取有关已完成工作的信息,失败选项卡以获取有关失败工作的更多信息,以及暂停选项卡以获取有关已暂停工作的更多信息。

现在你可以使用牛板监控仪表盘来检查队列。

总结

在这篇文章中,你使用bullmq将一个费时的任务转移到作业队列中。首先,你创建了一个具有缓慢请求/响应周期的费时任务的应用程序,但没有使用bullmq。然后,你使用bullmq将这个费时任务转移到异步执行,从而提升了请求/响应周期。之后,你使用bull-board创建了一个仪表盘,用于监控Redis中的bullmq队列。

您可以访问bullmq文档,了解本教程中未涉及的bullmq功能,如调度、优先级、重试作业以及配置工作人员的并发设置。您也可以访问bull-board文档,了解有关仪表盘功能的更多信息。

发表回复 0

Your email address will not be published. Required fields are marked *