我所设计的最强实时通信基础设施(实践篇)-以共同举办直播为例-
2020年Happy Elements圣诞日历第25天的内容。
簡述
在第24天的文章中,我写了使用Redis Streams实现实时通信基础设施的经过。这次我打算以实际操作的方式来介绍如何将Redis Streams用作Pub / Sub。我希望这篇文章能够让许多企业能够基于Redis Streams构建实时通信基础设施。
环境
苹果操作系统BigSur版本11.0.1
Nodebrew版本1.0.1
Npm版本6.14.9
Node版本14.0.0
Redis服务器版本6.0.9
让WebSocket可用
首先需要进行Node的初始化和安装ws。
npm init --yes
npm install ws
我会写代码使WebSocket通信得以实现。
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
我会启动服务器。
node server.js
我会确认WebSocket正常工作。
wscat -c ws://localhost:8080
连接到Redis
我要启动Redis。
redis-server
安装 ioredis。
npm install ioredis
我将使您能够使用ioredis。
const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
使用Redis Streams进行订阅
通过参考ioredis的问题,使用Redis Streams进行订阅。
const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
async function subscribeStream(stream, listener) {
let lastID = '$'
while (true) {
// Implement your own `try/catch` logic,
// (For example, logging the errors and continue to the next loop)
const reply = await redis.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID)
if (!reply) {
continue
}
const results = reply[0][1]
const {length} = results
if (!results.length) {
continue
}
listener(results)
lastID = results[length - 1][0]
}
}
subscribeStream('mystream', console.log)
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
启动服务器。
node server.js
通过连接到Redis并执行XADD命令来进行操作确认。
redis-cli
127.0.0.1:6379> XADD mystream * aaa 1234
使用Redis Streams进行发布
创建用于发布的Redis,并发布消息。
const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
async function subscribeStream(stream, listener) {
let lastID = '$'
while (true) {
// Implement your own `try/catch` logic,
// (For example, logging the errors and continue to the next loop)
const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
if (!reply) {
continue;
}
const results = reply[0][1];
const { length } = results;
if (!results.length) {
continue;
}
listener(results);
lastID = results[length - 1][0];
}
}
async function publishStream(stream, message) {
await publisher.xadd(stream, '*', 'message', message);
}
subscribeStream('mystream', console.log)
wss.on('connection', function connection(ws) {
ws.on('message', async function incoming(message) {
await publishStream('mystream', message);
console.log('publish: ' + message);
});
});
通过Redis Streams接收到的消息发送给服务器内的所有人。
当您订阅并接收到消息时,将会向服务器内的所有成员发送。
const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
async function subscribeStream(stream, listener) {
let lastID = '$'
while (true) {
// Implement your own `try/catch` logic,
// (For example, logging the errors and continue to the next loop)
const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
if (!reply) {
continue;
}
const results = reply[0][1];
const { length } = results;
if (!results.length) {
continue;
}
listener(results);
lastID = results[length - 1][0];
}
}
async function publishStream(stream, message) {
await publisher.xadd(stream, '*', 'message', message);
}
subscribeStream('mystream', function broadcast(results) {
results.forEach(result => {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(result[1][1]);
}
});
});
});
wss.on('connection', function connection(ws) {
ws.on('message', async function incoming(message) {
await publishStream('mystream', message);
});
});
使用wscat等工具确认消息可以通过多个连接进行交互。
wscat -c ws://localhost:8080
Connected (press CTRL+C to quit)
> aaa
< aaa
< aaa
< bbb
> ccc
< ccc
使用Redis Streams完成了实时通信基础设施!
只需进行以下操作即可完成:按照每个房间分别设置订阅者,连接到Redis集群,准备ALB以实现WebSocket服务器的负载均衡。在这些方面,您可以尝试使用Redis Pub/Sub来进行挑战。
整理
通过实践方式介绍了使用Redis Streams构建实时通信基础设施的方法。
由于它是一种可扩展且非常简单的实时通信基础设施,我认为它非常适合那些希望轻松使用的人。
(如果有任何建议或改进的想法,我将非常高兴听取)
看了这篇文章后,如果能够同样打造一个最强的实时通信基础设施,我会很高兴。感谢您的阅读到最后!
招募成员
Happy Elements株式会社 カカリアスタジオ非常欢迎有能力一起创造【备受热爱的内容】的团队成员!如果您对我们公司感兴趣的话,请务必点击以下招聘特设网站。