使用PHP进行Redis Streams操作的笔记

这是一篇记录,目的是想尝试使用Redis Streams在PHP中进行流消费。同时也尝试使用Swoole来在一个服务器上进行扩展。
由于只是粗略地阅读了相关文档并进行了尝试,所以可能存在错误的地方。

Redis Streams

Redis 流

Redis5实现了流媒体API,受到Kafka的启发而构建。

使用Redis Streams通过PHP

在phpredis 4.2版本中似乎已经提供了支持。以前由于安装方便的原因,很多人经常使用predis,但是predis并不支持。predis的开发似乎完全停止了。

Redis服务器

如果只是试一试的话,用Docker就可以了。

docker run -d --rm -p 6379:6379 redis:5.0.9

我想确保Streams API是否运作正常,所以使用了redis-cli等工具进行确认。

$ redis-cli
> XADD mystream * key value
> XREAD COUNT 1 STREAMS mystream 0-0
1) 1) "mystream"
   2) 1) 1) "1596783209134-0"
         2) 1) "key"
            2) "value"

确认会得到类似的结果。这条命令很复杂,如果手动输入的话,通常会出现几次错误,所以我认为复制粘贴更容易。
有关XADD和XREAD命令的详细信息,请查看Redis网站。

Redis Streams的消费者没有像消息系统那样,当订阅时会通知产生的消息,而是进行了Produce通知的行为。

消费者 zhě)

大致上,用這樣的程式碼是執行成功的。

<?php

$redis = new Redis();
$redis->connect('redis-server', 6379);

$consumer = new Consumer($redis);

while (true) {
    $values = $consumer->consume();
    foreach ($values[$streamName] ?? [] as $consumed) {
        foreach ($consumed as $key => $value) {
            print_r($value);
        }
    }
}

class Consumer {
    private $redis;
    public function __construct(Redis $redis) {
        $this->redis = $redis;
        $this->createGroupIfNotExists();
    }

    private function createGroupIfNotExists() {
        // XREADではなくXREADGROUPを使うため、事前にConsumerGroupが存在することを保証する
        // わざわざxInfoで確認してるけど、XGROUPにはGROUPがなかったときに作成する機能もあり
        // ライブラリもサポートしているので素直にxGroupのMKSTREAMオプションを有効にする方が良いでしょう
        $groupInfo = $this->redis->xInfo('GROUP', 'mystream', 'mygroup');
        if (!$groupInfo) {
            $this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>');
        }
        // これと等価
        // '>' の意味は概要を後述しますが、Redis StreamsのIntroductionを読むと良いです。
        // $this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>', $mkStream = true);
    }

    public function consume() {
        return $this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);
    }
}

为什么要使用XREADGROUP而不是XREAD呢,因为它具有可扩展性,并且管理读取到哪个ID的问题非常麻烦。因为XREAD似乎没有记录Redis服务器读取到哪个键的偏移量的机制,所以如果想要自动获取消费者单位中未获取部分的记录,可能需要使用XREADGROUP。

$this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);

在Consumer Group group1的Consumer consumer1中,从名为mystream的键的流中获取最多10条未获取的记录。如果所有记录都已获取,则最多等待2,000毫秒直到有记录添加进来。为了实现获取最多10条未获取的记录,需要指定[‘mystream’ => ‘>’]。

可扩展性

让我们来看看消费者是否能够扩大规模。

我认为,为了扩展Kafka的消费者,需要在主题(Topic)上设置分区,并在消费者组(Consumer Group)中添加消费者或进行扇出操作(我没有实际操作过,所以不太确定)。

一方,Redis Streams似乎没有这种分区的概念。但是,XREADGROUP的”>”是针对GROUP的值,因此只需向GROUP添加Consumer并使用XREADGROUP命令,就可以自动在每个Consumer上获取数据。不需要担心重新平衡。

当服务器数量(进程数量)增加且BLOCK请求积压时,延迟会怎样呢?READ请求会排队并按顺序传送吗?由于特定的Consumer与特定的分区没有相关性,因此无需考虑热点问题,对吧。

我认为暂且来说,只要增加服务器数量(进程数),就可以实现扩展。但是,如果要在不同的进程中运行,确定Consumer的名称可能会很麻烦。名称必须在组内唯一这个条件限制下,要认真做可能会很困难。即使使用相同的名称,也可以正常运行,所以可能更多是出于管理消费者状态、监控指标等运维上的原因。

Swoole: Swoole

我认为Redis Streams在处理上是以同步的方式使用BLOCK,而不是传统的消息传递系统,这与Plain-PHP很搭配。但我想尝试使用Swoole在单一进程中进行扩展。

为了确保XREADGROUP在单个进程中具有足够的吞吐量,采取了在获取值后使Consumer的工作部分并行运行的方法。如果XREADGROUP无法跟上,就可以考虑将请求也多路复用到Redis上。

程式碼变成了这个样子。

<?php

$messageBufferChannel = new Co\Channel(3);

// Worker
Co\run(function () use ($messageBufferChannel) {
    // Worker内の処理を多重度を制限するためのChannel
    $concurrencyCapChannel = new Co\Channel(5);

    while (true) {
        $values = $messageBufferChannel->pop();
        if ($values) {
            $concurrencyCapChannel->push(true);
            go(function() use ($values, $concurrencyCapChannel) {
                echo "got values " . count($values['mystream']) . "\n";
                $concurrencyCapChannel->pop();
            });
        }
    }
});

// Consumer
Co\run(function () use ($messageBufferChannel) {
    $redis = new Redis();
    $redis->connect('redis-server', 6379);

    while (true) {
        $values = $redis->xReadGroup('group1', 'consumer10', ['mystream' => '>'], $count = 10, $block = 2000);
        if ($values) {
            $messageBufferChannel->push($values);
        }
    }
});

因为我有段时间没有写作,所以进展非常缓慢。现在记得好像是应该这样写的…

Swoole在Docker中运行,可以参考https://www.swoole.co.uk/docs/get-started/try-docker。但是,这里介绍的镜像中没有安装phpredis。

运行 pecl install redis && docker-php-ext-enable redis.

我会随意在Dockerfile中添加并进行构建。

尽管试着写了一下,但是我对于Swoole的了解还很浅,要在实际生产中使用还需要更多的调查研究,否则会感到不安而无法使用。

相关文件

    • Introduction to Redis Streams

 

    • Redis Stream Command reference

 

    • Apache Kafka

 

    Swoole Cotroutine