我所设计的最强实时通信基础设施(实践篇)-以共同举办直播为例-

2020年Happy Elements圣诞日历第25天的内容。

簡述

在第24天的文章中,我写了使用Redis Streams实现实时通信基础设施的经过。这次我打算以实际操作的方式来介绍如何将Redis Streams用作Pub / Sub。我希望这篇文章能够让许多企业能够基于Redis Streams构建实时通信基础设施。

环境

苹果操作系统BigSur版本11.0.1
Nodebrew版本1.0.1
Npm版本6.14.9
Node版本14.0.0
Redis服务器版本6.0.9

让WebSocket可用

首先需要进行Node的初始化和安装ws。

npm init --yes
npm install ws

我会写代码使WebSocket通信得以实现。

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

我会启动服务器。

node server.js

我会确认WebSocket正常工作。

wscat -c ws://localhost:8080

连接到Redis

我要启动Redis。

redis-server

安装 ioredis。

npm install ioredis

我将使您能够使用ioredis。

const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

使用Redis Streams进行订阅

通过参考ioredis的问题,使用Redis Streams进行订阅。


const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

async function subscribeStream(stream, listener) {
  let lastID = '$'

  while (true) {
    // Implement your own `try/catch` logic,
    // (For example, logging the errors and continue to the next loop)
    const reply = await redis.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID)
    if (!reply) {
      continue
    }
    const results = reply[0][1]
    const {length} = results
    if (!results.length) {
      continue
    }
    listener(results)
    lastID = results[length - 1][0]
  }
}

subscribeStream('mystream', console.log)

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

启动服务器。

node server.js

通过连接到Redis并执行XADD命令来进行操作确认。

redis-cli
127.0.0.1:6379> XADD mystream * aaa 1234

使用Redis Streams进行发布

创建用于发布的Redis,并发布消息。

const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

async function subscribeStream(stream, listener) {
  let lastID = '$'

  while (true) {
    // Implement your own `try/catch` logic,
    // (For example, logging the errors and continue to the next loop)
    const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
    if (!reply) {
      continue;
    }
    const results = reply[0][1];
    const { length } = results;
    if (!results.length) {
      continue;
    }
    listener(results);
    lastID = results[length - 1][0];
  }
}

async function publishStream(stream, message) {
  await publisher.xadd(stream, '*', 'message', message);
}

subscribeStream('mystream', console.log)

wss.on('connection', function connection(ws) {
  ws.on('message', async function incoming(message) {
    await publishStream('mystream', message);
    console.log('publish: ' + message);
  });
});

通过Redis Streams接收到的消息发送给服务器内的所有人。

当您订阅并接收到消息时,将会向服务器内的所有成员发送。

const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

async function subscribeStream(stream, listener) {
  let lastID = '$'

  while (true) {
    // Implement your own `try/catch` logic,
    // (For example, logging the errors and continue to the next loop)
    const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
    if (!reply) {
      continue;
    }
    const results = reply[0][1];
    const { length } = results;
    if (!results.length) {
      continue;
    }
    listener(results);
    lastID = results[length - 1][0];
  }
}

async function publishStream(stream, message) {
  await publisher.xadd(stream, '*', 'message', message);
}

subscribeStream('mystream', function broadcast(results) {
  results.forEach(result => {
    wss.clients.forEach(function each(client) {
      if (client.readyState === WebSocket.OPEN) {
        client.send(result[1][1]);
      }
    });
  });
});

wss.on('connection', function connection(ws) {
  ws.on('message', async function incoming(message) {
    await publishStream('mystream', message);
  });
});

使用wscat等工具确认消息可以通过多个连接进行交互。

wscat -c ws://localhost:8080
Connected (press CTRL+C to quit)
> aaa
< aaa
< aaa
< bbb
> ccc
< ccc

使用Redis Streams完成了实时通信基础设施!

只需进行以下操作即可完成:按照每个房间分别设置订阅者,连接到Redis集群,准备ALB以实现WebSocket服务器的负载均衡。在这些方面,您可以尝试使用Redis Pub/Sub来进行挑战。

整理

通过实践方式介绍了使用Redis Streams构建实时通信基础设施的方法。
由于它是一种可扩展且非常简单的实时通信基础设施,我认为它非常适合那些希望轻松使用的人。
(如果有任何建议或改进的想法,我将非常高兴听取)

看了这篇文章后,如果能够同样打造一个最强的实时通信基础设施,我会很高兴。感谢您的阅读到最后!

招募成员

Happy Elements株式会社 カカリアスタジオ非常欢迎有能力一起创造【备受热爱的内容】的团队成员!如果您对我们公司感兴趣的话,请务必点击以下招聘特设网站。

广告
将在 10 秒后关闭
bannerAds