Amazon MQ for ActiveMQ 提供了一个Python的消息发送和接收示例

亚马逊MQ适用于ActiveMQ

在ActiveMQ中,会创建五个不同协议的端点。
这里提供了ActiveMQ队列的实现示例。
需要注意的是,执行环境是基于Lambda条件下。

OpenWire 开放式电线

稍后写

可以参考

    • http://engmng.blog.fc2.com/blog-entry-107.html

 

    • https://simplesassim.wordpress.com/2013/12/30/how-to-send-a-message-to-an-apache-activemq-queue-with-jython/

 

    • https://simplesassim.wordpress.com/2013/12/30/how-to-receive-a-message-from-an-apache-activemq-queue-with-jython/

 

    https://docs.confluent.io/ja-jp/platform/7.0/clients/kafka-jms-client/installation.html#cjms-installation

用中文转述以下内容,只需提供一种选项:

踩(STOMP)

寄信

简述

    • トリガーイベントから連携された内容をActiveMQにキューイングする

 

    • DynamoDB Streamをトリガーとする

 

    イベントのうち、DynamoDBへのインサートだけを処理対象としている(こちらはフィルタリング設定でも可)
import stomp
import ssl
import json
import os

broker = os.environ['BROKER_URL']
port = os.environ['BROKER_PORT']
queue = os.environ['QUEUE_NAME']
username = os.environ['USER_NAME']
password = os.environ['PASSWORD']


def lambda_handler(event, context):
    
    record = event['Records'][0]
    print(json.dumps(record))

    # DynamodbにレコードがINSERTされた場合
    if event['Records'][0]['eventName'] == 'INSERT':
        record = event['Records'][0]['dynamodb']['NewImage']
        print('INSERT')


        # AmazonMQへenqueue
        conn = stomp.Connection([(broker, port)])
        conn.set_ssl(for_hosts=[(broker, port)],ssl_version=ssl.PROTOCOL_TLSv1_2)
        conn.connect(username, password, wait=True)
        ret = conn.send(body=json.dumps(record), destination=queue, headers={'message-id': 'test'})
        print(ret)
        conn.disconnect()

收到

简而言之

    • ActiveMQから5秒間Subscribeし、取得したキューの中身をファイル化してS3へPUTする

 

    • エンドポイントはSSLになっているためset_sslをしている

 

    Lambdaにおいてグローバル変数を扱う場合には注意が必要(handler外は再利用される)
import boto3
import os
import time
import stomp
import ssl
import datetime
from logging import getLogger, INFO, DEBUG

# Logの設定
logger = getLogger(__name__)
logger.setLevel(os.environ['LOG_LEVEL'])

# MQの設定
broker = os.environ['BROKER_URL']
port = os.environ['BROKER_PORT']
queue = os.environ['QUEUE_NAME']
username = os.environ['USER_NAME']
password = os.environ['PASSWORD']
subscription_id = '1'
polling_time = 5

# S3の設定
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(os.environ['BUCKET_NAME'])
tmp_output = '/tmp/output.csv'


class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        logger.info('on_error: received an error "%s"' % frame.body)

    def on_message(self, frame):
        # debug log
        logger.debug('on_message: processed message')
        logger.debug('message-id: %s', frame.headers['message-id'])
        read_messages.append(frame.body)
        # ACKを明示的に行う
        self.conn.ack(frame.headers['message-id'], subscription_id)

def lambda_handler(event, context):
    # グローバル変数定義と初期化
    global read_messages
    read_messages = []
    
    # ブローカー接続とキュー取得
    conn = stomp.Connection([(broker, port)], heartbeats=(8000, 5000))
    conn.set_listener('', MyListener(conn))
    conn.set_ssl(for_hosts=[(broker, port)], ssl_version=ssl.PROTOCOL_TLSv1_2)
    conn.connect(username, password, wait=True)
    conn.subscribe(destination=queue, id=subscription_id, ack='client-individual')
    logger.info('mq connect.')
    time.sleep(polling_time)
    
    # ブローカーとの切断
    conn.disconnect()
    logger.info('mq disconnect.')

    # S3へCSVファイルをアップロード
    if len(read_messages) == 0:
        logger.info('queue is empty.')
    else:
        with open(tmp_output, mode='a') as f:
            for message in read_messages:
                print(message, file=f)

        now = datetime.datetime.now()
        s3_output = 'output_'+now.strftime('%Y%m%d%H%M%S')+'.csv'
        s3_bucket.upload_file(tmp_output, s3_output)
        logger.info('s3 upload complete. file_name: %s', s3_output)

注意:当使用Lambda来实现经常在示例中出现的 on_disconnected() 等函数时,会引发大量的段错误(原因未知)。

网络安全

因为不太懂,所以省略。

使用 AMQP

可以使用RabbitMQ?如果能做到就去做吧。

MQTT:物联网通信协议

由于MQTT只支持发布/订阅方式,因此不适用于其他情况。

请参考以下资料。

    • ActiveMQ公式ドキュメントのサンプル

https://activemq.apache.org/cross-language-clients

STOMP/RabbitMQでのSSLの実装サンプル

https://wp.sjmf.in/?p=86

bannerAds