最近、MQサーバーに強い関心があり、その一つとしてRabbitMQサーバーを立ち上げてみました。
RabbitMQサーバーとは
RabbitMQは、メッセージ指向のミドルウェアで、アプリケーション間のメッセージのやり取りを仲介するためのメッセージキューサーバです。
RabbitMサーバーは、Erlang言語で書かれていて、クラスタリングとフェイルオーバーのためにOpen Telecom Platformフレームワークで構築されています。
元々はRabbit Technologies社で開発していましたが、2010年4月にSpringFrameworkの開発元であるSpringSource社に買収され、その後SpringSource社がVMware社に買収されました。そして、EMC、VMware、GEの出資で設立されたPivotal Software社がRabbitMQの開発・サポートを行っていましたが、2019年にVMware社がPivotal Software社を吸収合併し、今はVMware社がサポートを行っています。
特徴
RabbitMQは以下の特徴を持っています。
- 言語の中立性
RabbitMQのクライアントは多くのプログラミング言語で利用できるため、異なる言語で書かれたアプリケーション間でもメッセージのやり取りが可能です。 - 耐障害性
RabbitMQは、メッセージの永続化やクラスタリング機能を提供しており、障害時のデータの喪失リスクを低減することができます。 - 拡張性
クラスタリングやフェデレーションといった機能を利用することで、大量のメッセージトラフィックを扱うことができます。 - 柔軟性
トピックベースのルーティング、ワークキュー、RPCなど、さまざまなメッセージパターンをサポートしています。
主な概念
RabbitMQの主な概念は以下になります。
- Producer
メッセージを生成してRabbitMQに送信するエンティティです。 - Consumer
キューからメッセージを受け取って処理するエンティティです。 - Channel
1つのコネクションを仮想的に分離する概念です。 - Queue
メッセージが保存される場所。消費者はここからメッセージを取得します。 - Exchange
プロデューサから受け取ったメッセージをキューにルーティングする役割を持ちます。RabbitMQにはいくつかの標準的な交換タイプ(direct, topic, fanout, headers)があります。
使われている用語はMQサーバー固有の用語でないため、他のMQサーバーでは少し違う用語で定義されている場合がありますので、ご注意ください。
RabbitMQサーバーの構築
RabbitMQサーバーの起動方法はいくつかありますが、ここでは手軽にDockerで起動することにします。
$ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management
Publisherを作成する
MQサーバーにメッセージを送信するPublisherを作成します。RabbitMQでは複数のプログラミング言語をサポートしていますが、ここではPythonのpikaパッケージを使用して作成していきます。
RabbitMQサーバーへのコネクションを確立する
RabbitMQサーバーへのコネクションを確立します。ここではローカルで起動しているRabbitMQサーバーへ接続するため、host='localhost'
を指定しています。
import pika
# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
BlockingConnectionを使用してコネクションを確立すると、同期的に処理を実行することができます。
Channelを作成する
次にChannelを作成します。
import pika
# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# チャネルを作成する
channel = connection.channel()
今回の例ではPublisherは1つのキューにメッセージを送信する経路だけがあればよいので、1つだけチャネルを作成しています。
キューを定義する
メッセージを送信するキューを作成します。キューにはキューを一意に識別するための名前をつける必要があり、ここではqueue='hello'
を指定しています。
import pika
# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# チャネルを作成する
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
メッセージを送信する
では、実際にメッセージを送信してみましょう。
メッセージの送信方法にはいくつかありますが、ここではチュートリアルでも使われている最もシンプルな方法で実装します。
import pika
# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# チャネルを作成する
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
# メッセージを送信する
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!!')
Exchangeタイプを''
にするとデフォルトのExchangeタイプとなり、routing_key
で指定したキューに直接メッセージを送ります。
コネクションをクローズする
最後にコネクションをクローズします。
import pika
# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# チャネルを作成する
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
# メッセージを送信する
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!!')
# コネクションをクローズする
connection.close()
Consumerを作成する
次にメッセージを受け取るConsumerを作成します。Consumerはあるキューにメッセージが追加されるのを待ち、メッセージが追加されたらそのメッセージを取得して処理を行います。
コネクションを確立し、キューを定義する
コネクションを確立〜キューを作成するまでの流れはPublisherと同じです。キュー名はPublisherで定義したキュー名を同じにします。
import pika
# RabbitMQサーバーへのコネクションを確立する
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
# チャネルを作成する
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
メッセージ受信時に実行するコールバック関数を作成する
メッセージを受信したときに実行するコールバック関数を作成します。この関数はメッセージを受信するたびに呼び出されます。
import pika
# RabbitMQサーバーへのコネクションを確立する
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
# チャネルを作成する
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
""" メッセージを受信したときに実行されるコールバック関数 """
print(f" {body} Received")
ch.basic_ack(delivery_tag=method.delivery_tag)
メッセージの受信を待ち受ける
最後にメッセージの受信を待ち受けます。待ち受けるキューのキュー名とメッセージを受信した時に実行するコールバック関数を指定します。
import pika
# RabbitMQサーバーへのコネクションを確立する
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)
# チャネルを作成する
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
""" メッセージを受信したときに実行されるコールバック関数 """
print(f" {body} Received")
ch.basic_ack(delivery_tag=method.delivery_tag)
# メッセージを受信する
channel.basic_consume(queue='hello', on_message_callback=callback)
channel.start_consuming()
まとめ
今回示したのは、非常に単純な内容です。実際に業務システムに適用するには、もっと複雑な設定が必要となる場合があります。
また、MQサーバーに関する記事で「デファクト」「デファクトスタンダード」という記述を見かけますが、これまで実務で使用している限りでは「とりあえずこれを使っておけばいい」といえるデファクトスタンダードは存在していないと考えています。
どういった目的で使用するのか、動作するプラットフォームは何か、MQサーバーを管理するか/しないか、どれくらいの頻度でメッセージの送信・受信が行われるのか、どうメッセージを配信するのか、エコシステムで使用される関連するミドルウェアは何か、などの複数の要因でどのMQサーバーが最適か、どのMQサーバーは適していないか、が変わります。
そのため、複数のMQサーバーを比較・検討してどのMQサーバーを使用するかを選定する必要がありますが、MQサーバーによって用語が異なっていたり、同じ用語でも指し示しているものが少し違っていたりするため、用語を正しく理解することは非常に重要です。
マイクロサービスが浸透してくるにつれてサービス間を疎結合に保つことが一般的になってきました。また、大規模・超大規模なシステムではSOAの考え方を引き継いでいるものもあり、そういったシステムではシステム間をMQやメッセージバスによって接続している場合もあります。これからはもっとMQサーバーを利用することが多くなってくることが予想されるため、基本的な使用方法だけでも知っておいた方がよいと思います。