DUICUO

RabbitMQ シリーズ: トピックモデル

みなさんこんにちは。私はAmazingです。

RabbitMQは、パブリッシュ/サブスクライブ、ルーティング、ワーカーモデルなど、様々な通信モデルを提供する人気のオープンソースメッセージキューソフトウェアです。最初の4つのモデルについては以前の記事で既に紹介しており、この記事ではRabbitMQのトピックモデルについて詳しく説明します。今後、RabbitMQに関するチュートリアルシリーズを公開予定ですので、お役に立てればぜひご覧ください。

トピックモデル

トピックモデルはRabbitMQの高度なモデルの一つです。ワイルドカードの概念を用いることで、ルーティングルールのより柔軟なマッチングが可能になります。本質的には、トピックモデルはルーティングモデルのアップグレードであり、主にルールのあいまいマッチングを可能にします。

トピックモデルでは、プロデューサーはメッセージをエクスチェンジに送信し、エクスチェンジはルーティングキーに基づいてメッセージを対応するキューに転送します。ダイレクトモデルとは異なり、トピックモデルはルーティングキーにワイルドカードマッチングをサポートしています。「*」は単一の単語に一致し、「#」は複数の単語に一致します。例えば、「order.*」は「order.create」や「​​order.delete」のようなメッセージに一致し、「order.#」は「order.create.one」や「order.delete.two」のようなメッセージに一致します。

適用可能なシナリオ

トピック モデルは、次のような柔軟なメッセージ ルーティング ルールを必要とするシナリオに適しています。

  1. 分類されたニュース フィードへのニュース ウェブサイトのサブスクリプション。
  2. 電子商取引ウェブサイトは製品カテゴリのメッセージを購読します。
  3. 金融機関は株式市場のニュースなどを購読しています。

デモンストレーション

  1. プロデューサー
 // プロデューサー
パブリッククラスプロデューサー{
プライベート静的最終文字列 EXCHANGE_NAME = "exchange_topic_1";
プライベート静的最終文字列 EXCHANGE_ROUTING_KEY1 = "topic.km";
プライベート静的最終文字列 EXCHANGE_ROUTING_KEY2 = "topic.km.001";

パブリック静的void main(String[] args)はIOException、TimeoutExceptionをスローします{
接続 connection = ConnectionUtils.getConnection();
チャネル channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME、"トピック");
(int i = 0; i < 100; i++) の場合 {
// トピック モデルでは、ルート キーのみが変更され、その他はすべて同じままです。
もし (i % 2 == 0) {
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("トピック モデルによって送信された " + i + " 番目のメッセージ").getBytes());
} それ以外 {
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("トピック モデルによって送信された " + i + " 番目のメッセージ").getBytes());
}
}
チャネルを閉じます。
接続を閉じます。
}
}
  1. 消費者
 // 消費者1
パブリッククラス Consumer1 {
プライベート静的最終文字列 QUEUE_NAME = "queue_topic_1";
プライベート静的最終文字列 EXCHANGE_NAME = "exchange_topic_1";
プライベート静的最終文字列 EXCHANGE_ROUTING_KEY = "topic.*";

パブリック静的void main(String[] args)はIOException、TimeoutExceptionをスローします{
接続 connection = ConnectionUtils.getConnection();
チャネル channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME、"トピック");
channel.queueBind(QUEUE_NAME、EXCHANGE_NAME、EXCHANGE_ROUTING_KEY);
デフォルトコンシューマー デフォルトコンシューマー = 新しいデフォルトコンシューマー(チャネル) {
@オーバーライド
public void handleDelivery(String consumerTag, Envelope エンベロープ, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("コンシューマー1がメッセージを受信しました: " + new String(body));
}
};
channel.basicConsume(QUEUE_NAME、true、defaultConsumer);
}
}
 // 消費者2
パブリッククラス Consumer2 {
プライベート静的最終文字列 QUEUE_NAME = "queue_topic_2";
プライベート静的最終文字列 EXCHANGE_NAME = "exchange_topic_1";
プライベート静的最終文字列 EXCHANGE_ROUTING_KEY = "topic.#";

パブリック静的void main(String[] args)はIOException、TimeoutExceptionをスローします{
接続 connection = ConnectionUtils.getConnection();
チャネル channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "トピック");
channel.queueBind(QUEUE_NAME、EXCHANGE_NAME、EXCHANGE_ROUTING_KEY);
デフォルトコンシューマー デフォルトコンシューマー = 新しいデフォルトコンシューマー(チャネル) {
@オーバーライド
public void handleDelivery(String consumerTag, Envelope エンベロープ, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("コンシューマー2がメッセージを受信しました: " + new String(body));
}
};
channel.basicConsume(QUEUE_NAME、true、defaultConsumer);
}
}

テスト

まず 2 つのコンシューマーを起動し、次にプロデューサーを起動します。

コンシューマー1は文字列「order.*」を含むメッセージを購読し、コンシューマー2は文字列「order.#」を含むメッセージを購読します。以下の結果が得られます。

コンシューマー 1 は、「トピック モデルによって送信された偶数番号のメッセージ」というメッセージを受信しました。

コンシューマー 2 は、「トピック モデルによって送信されたすべてのメッセージ」というメッセージを受信します。

まとめ

この記事では、RabbitMQ通信におけるトピックモデルの使用法を紹介し、交換とルーティングキーを通じてより柔軟なメッセージルーティングを実現する方法を示します。実際の使用においては、以下の点に留意する必要があります。

  1. ルーティング キーは、「.」で区切られた複数の単語の形式にする必要があります。
  2. 「#」は複数の単語に一致し、「*」は単一の単語に一致します。
  3. キューは複数のルーティング キーにバインドできます。
  4. 交換はどのキューでも一致するものを見つけられない場合、メッセージを破棄します。