尝试使用Redis和Python来进行发布/订阅
尝试使用Redis和Python进行发布/订阅。
首先
我之前有接触过Redis, 以为它只是一个键值存储(KVS),但后来发现它还有发布订阅功能。为了好玩,我试着玩了一下发布订阅的特性。顺便一提,我还用Python3和redis.py库实现了并发操作。
这是示例,请点击此处查看:
https://github.com/ryoutoku/redis-sample
这次创建的环境如下所示。
-
- 使用Vagrant在VirtualBox(客户端)中启动Ubuntu14.04+Redis服务器。
-
- 如果Redis内的数据发生更改,则使用该数据的键作为通道,并使用pub通知键的数据的pub端脚本。
- 将订阅(sub)注册到Redis并将数据推送到Redis服务器的sub端脚本。
在第3步中将推送的数据,通过第2步进行检测并通知,再由第3步接收。
这次我们在主机端执行了所有的pub/sub脚本,但在客户端执行也没有问题。
必要的环境
-
- Vagrant
-
- Python3
- redis-py
有关使用Redis功能的事项
由于我个人对于Redis并不很熟悉,所以在本文中,我会简要介绍本次使用的功能。
-
- Key Value Store(KVS)
いわゆるNoSQLというやつで、keyに対してValueを紐付けてデータを管理する
Pythonでいうdictのようなもの
今回はtestというkeyにlistのデータとして追加する
pub/sub
メッセージ通知のモデル
Publish(通知)とSubscribe(購読)という2つの役割がある:参考
channelという概念があり、channelを決めて購読、通知を行う事で、そのchannelに対してのみデータ通知などが行える
程序解释
我将解释以下代码的关键点(我在实现过程中花费了一些时间)。
-
- Vagrantfile
-
- Publisher.py
- Subscriber.py
Vagrantfile 虚拟机配置文件
在Vagrantfile中,主要执行以下任务。
-
- 将客户端的IP地址固定为192.168.56.101
-
- Redis的配置文件
- Redis配置文件的位置
Vagrant.configure("2") do |config|
# ホストオンリーアダプタをipアドレス固定で追加
config.vm.network "private_network", ip: "192.168.56.101"
# 中略 ##########################################
# redisの設定ファイルのコピー
config.vm.provision "file", source: "./conf/redis.conf", destination: "/home/vagrant/redis.conf"
config.vm.provision "shell", inline: <<-SHELL
apt-get update
# pip, redis.pyインストール
apt-get install python-pip -y
pip install redis
# redisインストール
sudo apt-get install redis-server -y
# redis設定ファイルを移動
mv -f /home/vagrant/redis.conf /etc/redis/redis.conf
# redisの起動
redis-server &
SHELL
end
以下是令人让起毛的地方。
-
- redisはデフォルトではローカルIP(127.0.0.)しか接続を受け付けないため、設定ファイルを上書きする必要がある
直接上書きできないので、/home/vagrant以下にredis.confをコピー
コピーしたredis.confを/etc/redisに移動した後redisを起動
出版商(出版者)
pub的操作如下所示。
-
- 在获取所有Redis的键之后,根据键进行如下操作:
-
- – 如果键中有数据添加,则将数据通知为key=channel。
- – 如果数据的最终值为end,则终止操作。
以下是出版商核心部分的示例。
class Publisher(object):
def send_message(self):
while True:
keys = self._connection.keys()
is_break = False
for key in keys:
old_data = self._data.setdefault(key, [])
data = self._connection.lrange(key, 0, -1)
if len(old_data) == len(data):
continue
key_str = key.decode('utf-8')
data_str = [x.decode('utf-8') for x in data]
print("publish:", data_str)
self._connection.publish(key_str, data_str)
self._data[key] = data
if data_str[-1] == "end":
self._connection.rpop(key)
is_break = True
if is_break:
break
以下是会让人感到恶心的地方:
* Redis的键和数据都是以字节型(byte)存在,因此需要进行解码。
订阅者
在sub中进行以下操作。
-
- 启动另一个进程,并订阅数据作为通道=’test’。
当
从标准输入获取字符串
将获取的字符串添加到Redis作为键值key=’test’
如果获取的字符串是’end’,则结束
以下是订阅者的核心部分示意:
class SubscriberSubject(object):
_re_format = "\'(.*?)\'"
# 中略 ##########################################
def _receive_core(self, channel, end_word):
pubsub = self._connection.pubsub()
pubsub.subscribe([channel])
for data in pubsub.listen():
[x(data) for x in self._callbacks]
if isinstance(data['data'], bytes):
data_str = self._re.findall(data['data'].decode('utf-8'))
if data_str[-1] == end_word:
break
pubsub.unsubscribe()
def start_receive(self, channel, end_word='end'):
if self._job:
return
self._job = Process(target=self._receive_core,
args=(channel, end_word))
self._job.start()
def end_receive(self):
self._job.join()
def add_data(self, key, value):
self._connection.rpush(key, value)
if __name__ == "__main__":
# 中略 ##########################################
# sub
subject = SubscriberSubject(host, port)
subject.add_callback(
lambda x: print("callback:", xt)
)
subject.start_receive(channel)
# 中略 ##########################################
# 標準入力部
while(True):
data = sys.stdin.readline().strip()
subject.add_data(channel, data)
if data == "end":
subject.end_receive()
break
以下是让人不悦的地方:
* 在第一次pubsub.listen()时,将获得的字典设置为data:1。
* 在第二次及以后的pubsub.listen()时,将获得的字典设置为data:b[数据]。
* 因为是字节类型,所以需要解码后,使用正则表达式(_re_format)将其分割为字符串类型。
行动
打开两个终端并分别执行以下操作:
* 运行 python publisher.py
* 检查 Redis 数据并通知如有更改
* 运行 python subscriber.py
* 将从标准输入接收的数据添加到 Redis
* 订阅来自 publisher 的数据
操作就是这个样子。
终端上面是publisher.py,下面是subscriber.py。
-
- 将下面输入的值存储到Redis中。
-
- 从Redis中发布数据(显示数据)。
- 显示从下方接收到的数据。