RabbitMQサーバーを立ち上げる

RabbitMQサーバーを立ち上げる

最近、MQサーバーに強い関心があり、その一つとしてRabbitMQサーバーを立ち上げてみました。

目次

RabbitMQサーバーとは

RabbitMQは、メッセージ指向のミドルウェアで、アプリケーション間のメッセージのやり取りを仲介するためのメッセージキューサーバです。

RabbitMサーバーは、Erlang言語で書かれていて、クラスタリングとフェイルオーバーのためにOpen Telecom Platformフレームワークで構築されています。

元々はRabbit Technologies社で開発していましたが、2010年4月にSpringFrameworkの開発元であるSpringSource社に買収され、その後SpringSource社がVMware社に買収されました。そして、EMC、VMware、GEの出資で設立されたPivotal Software社がRabbitMQの開発・サポートを行っていましたが、2019年にVMware社がPivotal Software社を吸収合併し、今はVMware社がサポートを行っています。

特徴

RabbitMQは以下の特徴を持っています。

  1. 言語の中立性
    RabbitMQのクライアントは多くのプログラミング言語で利用できるため、異なる言語で書かれたアプリケーション間でもメッセージのやり取りが可能です。
  2. 耐障害性
    RabbitMQは、メッセージの永続化やクラスタリング機能を提供しており、障害時のデータの喪失リスクを低減することができます。
  3. 拡張性
    クラスタリングやフェデレーションといった機能を利用することで、大量のメッセージトラフィックを扱うことができます。
  4. 柔軟性
    トピックベースのルーティング、ワークキュー、RPCなど、さまざまなメッセージパターンをサポートしています。

主な概念

RabbitMQの主な概念は以下になります。

  1. Producer
    メッセージを生成してRabbitMQに送信するエンティティです。
  2. Consumer
    キューからメッセージを受け取って処理するエンティティです。
  3. Channel
    1つのコネクションを仮想的に分離する概念です。
  4. Queue
    メッセージが保存される場所。消費者はここからメッセージを取得します。
  5. Exchange
    プロデューサから受け取ったメッセージをキューにルーティングする役割を持ちます。RabbitMQにはいくつかの標準的な交換タイプ(direct, topic, fanout, headers)があります。

使われている用語はMQサーバー固有の用語でないため、他のMQサーバーでは少し違う用語で定義されている場合がありますので、ご注意ください。

RabbitMQサーバーの構築

RabbitMQサーバーの起動方法はいくつかありますが、ここでは手軽にDockerで起動することにします。

$ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management

Publisherを作成する

MQサーバーにメッセージを送信するPublisherを作成します。RabbitMQでは複数のプログラミング言語をサポートしていますが、ここではPythonのpikaパッケージを使用して作成していきます。

RabbitMQサーバーへのコネクションを確立する

RabbitMQサーバーへのコネクションを確立します。ここではローカルで起動しているRabbitMQサーバーへ接続するため、host='localhost'を指定しています。

import pika

# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

BlockingConnectionを使用してコネクションを確立すると、同期的に処理を実行することができます。

Channelを作成する

次にChannelを作成します。

import pika

# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# チャネルを作成する
channel = connection.channel()

今回の例ではPublisherは1つのキューにメッセージを送信する経路だけがあればよいので、1つだけチャネルを作成しています。

キューを定義する

メッセージを送信するキューを作成します。キューにはキューを一意に識別するための名前をつける必要があり、ここではqueue='hello'を指定しています。

import pika

# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# チャネルを作成する
channel = connection.channel()

# キューを作成する
channel.queue_declare(queue='hello')

メッセージを送信する

では、実際にメッセージを送信してみましょう。

メッセージの送信方法にはいくつかありますが、ここではチュートリアルでも使われている最もシンプルな方法で実装します。

import pika

# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# チャネルを作成する
channel = connection.channel()

# キューを作成する
channel.queue_declare(queue='hello')

# メッセージを送信する
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!!')

Exchangeタイプを''にするとデフォルトのExchangeタイプとなり、routing_keyで指定したキューに直接メッセージを送ります。

コネクションをクローズする

最後にコネクションをクローズします。

import pika

# RabbitMQサーバーへのコネクションを確立する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# チャネルを作成する
channel = connection.channel()

# キューを作成する
channel.queue_declare(queue='hello')

# メッセージを送信する
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!!')

# コネクションをクローズする
connection.close()

Consumerを作成する

次にメッセージを受け取るConsumerを作成します。Consumerはあるキューにメッセージが追加されるのを待ち、メッセージが追加されたらそのメッセージを取得して処理を行います。

コネクションを確立し、キューを定義する

コネクションを確立〜キューを作成するまでの流れはPublisherと同じです。キュー名はPublisherで定義したキュー名を同じにします。

import pika

# RabbitMQサーバーへのコネクションを確立する
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)

# チャネルを作成する
channel = connection.channel()

# キューを作成する
channel.queue_declare(queue='hello')

メッセージ受信時に実行するコールバック関数を作成する

メッセージを受信したときに実行するコールバック関数を作成します。この関数はメッセージを受信するたびに呼び出されます。

import pika

# RabbitMQサーバーへのコネクションを確立する
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)

# チャネルを作成する
channel = connection.channel()

# キューを作成する
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    """ メッセージを受信したときに実行されるコールバック関数 """
    print(f" {body} Received")
    ch.basic_ack(delivery_tag=method.delivery_tag)

メッセージの受信を待ち受ける

最後にメッセージの受信を待ち受けます。待ち受けるキューのキュー名とメッセージを受信した時に実行するコールバック関数を指定します。

import pika

# RabbitMQサーバーへのコネクションを確立する
pika_param = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_param)

# チャネルを作成する
channel = connection.channel()

# キューを作成する
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    """ メッセージを受信したときに実行されるコールバック関数 """
    print(f" {body} Received")
    ch.basic_ack(delivery_tag=method.delivery_tag)


# メッセージを受信する
channel.basic_consume(queue='hello', on_message_callback=callback)
channel.start_consuming()

まとめ

今回示したのは、非常に単純な内容です。実際に業務システムに適用するには、もっと複雑な設定が必要となる場合があります。

また、MQサーバーに関する記事で「デファクト」「デファクトスタンダード」という記述を見かけますが、これまで実務で使用している限りでは「とりあえずこれを使っておけばいい」といえるデファクトスタンダードは存在していないと考えています。

どういった目的で使用するのか、動作するプラットフォームは何か、MQサーバーを管理するか/しないか、どれくらいの頻度でメッセージの送信・受信が行われるのか、どうメッセージを配信するのか、エコシステムで使用される関連するミドルウェアは何か、などの複数の要因でどのMQサーバーが最適か、どのMQサーバーは適していないか、が変わります。

そのため、複数のMQサーバーを比較・検討してどのMQサーバーを使用するかを選定する必要がありますが、MQサーバーによって用語が異なっていたり、同じ用語でも指し示しているものが少し違っていたりするため、用語を正しく理解することは非常に重要です。

マイクロサービスが浸透してくるにつれてサービス間を疎結合に保つことが一般的になってきました。また、大規模・超大規模なシステムではSOAの考え方を引き継いでいるものもあり、そういったシステムではシステム間をMQやメッセージバスによって接続している場合もあります。これからはもっとMQサーバーを利用することが多くなってくることが予想されるため、基本的な使用方法だけでも知っておいた方がよいと思います。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次