|
みなさんこんにちは。私はAmazingです。 RabbitMQは、パブリッシュ/サブスクライブ、ルーティング、ワーカーモデルなど、様々な通信モデルを提供する人気のオープンソースメッセージキューソフトウェアです。最初の4つのモデルについては以前の記事で既に紹介しており、この記事ではRabbitMQのトピックモデルについて詳しく説明します。今後、RabbitMQに関するチュートリアルシリーズを公開予定ですので、お役に立てればぜひご覧ください。 トピックモデルトピックモデルはRabbitMQの高度なモデルの一つです。ワイルドカードの概念を用いることで、ルーティングルールのより柔軟なマッチングが可能になります。本質的には、トピックモデルはルーティングモデルのアップグレードであり、主にルールのあいまいマッチングを可能にします。 トピックモデルでは、プロデューサーはメッセージをエクスチェンジに送信し、エクスチェンジはルーティングキーに基づいてメッセージを対応するキューに転送します。ダイレクトモデルとは異なり、トピックモデルはルーティングキーにワイルドカードマッチングをサポートしています。「*」は単一の単語に一致し、「#」は複数の単語に一致します。例えば、「order.*」は「order.create」や「order.delete」のようなメッセージに一致し、「order.#」は「order.create.one」や「order.delete.two」のようなメッセージに一致します。 適用可能なシナリオトピック モデルは、次のような柔軟なメッセージ ルーティング ルールを必要とするシナリオに適しています。 - 分類されたニュース フィードへのニュース ウェブサイトのサブスクリプション。
- 電子商取引ウェブサイトは製品カテゴリのメッセージを購読します。
- 金融機関は株式市場のニュースなどを購読しています。
デモンストレーション- プロデューサー
// プロデューサー パブリッククラスプロデューサー{ プライベート静的最終文字列 EXCHANGE_NAME = "exchange_topic_1"; プライベート静的最終文字列 EXCHANGE_ROUTING_KEY1 = "topic.km"; プライベート静的最終文字列 EXCHANGE_ROUTING_KEY2 = "topic.km.001";
パブリック静的void main(String[] args)はIOException、TimeoutExceptionをスローします{ 接続 connection = ConnectionUtils.getConnection(); チャネル channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME、"トピック"); (int i = 0; i < 100; i++) の場合 { // トピック モデルでは、ルート キーのみが変更され、その他はすべて同じままです。 もし (i % 2 == 0) { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("トピック モデルによって送信された " + i + " 番目のメッセージ").getBytes()); } それ以外 { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("トピック モデルによって送信された " + i + " 番目のメッセージ").getBytes()); } } チャネルを閉じます。 接続を閉じます。 } } - 消費者
// 消費者1 パブリッククラス Consumer1 { プライベート静的最終文字列 QUEUE_NAME = "queue_topic_1"; プライベート静的最終文字列 EXCHANGE_NAME = "exchange_topic_1"; プライベート静的最終文字列 EXCHANGE_ROUTING_KEY = "topic.*";
パブリック静的void main(String[] args)はIOException、TimeoutExceptionをスローします{ 接続 connection = ConnectionUtils.getConnection(); チャネル channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME、"トピック"); channel.queueBind(QUEUE_NAME、EXCHANGE_NAME、EXCHANGE_ROUTING_KEY); デフォルトコンシューマー デフォルトコンシューマー = 新しいデフォルトコンシューマー(チャネル) { @オーバーライド public void handleDelivery(String consumerTag, Envelope エンベロープ, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("コンシューマー1がメッセージを受信しました: " + new String(body)); } }; channel.basicConsume(QUEUE_NAME、true、defaultConsumer); } } // 消費者2 パブリッククラス Consumer2 { プライベート静的最終文字列 QUEUE_NAME = "queue_topic_2"; プライベート静的最終文字列 EXCHANGE_NAME = "exchange_topic_1"; プライベート静的最終文字列 EXCHANGE_ROUTING_KEY = "topic.#";
パブリック静的void main(String[] args)はIOException、TimeoutExceptionをスローします{ 接続 connection = ConnectionUtils.getConnection(); チャネル channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "トピック"); channel.queueBind(QUEUE_NAME、EXCHANGE_NAME、EXCHANGE_ROUTING_KEY); デフォルトコンシューマー デフォルトコンシューマー = 新しいデフォルトコンシューマー(チャネル) { @オーバーライド public void handleDelivery(String consumerTag, Envelope エンベロープ, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("コンシューマー2がメッセージを受信しました: " + new String(body)); } }; channel.basicConsume(QUEUE_NAME、true、defaultConsumer); } } テスト まず 2 つのコンシューマーを起動し、次にプロデューサーを起動します。 コンシューマー1は文字列「order.*」を含むメッセージを購読し、コンシューマー2は文字列「order.#」を含むメッセージを購読します。以下の結果が得られます。 コンシューマー 1 は、「トピック モデルによって送信された偶数番号のメッセージ」というメッセージを受信しました。 コンシューマー 2 は、「トピック モデルによって送信されたすべてのメッセージ」というメッセージを受信します。 まとめこの記事では、RabbitMQ通信におけるトピックモデルの使用法を紹介し、交換とルーティングキーを通じてより柔軟なメッセージルーティングを実現する方法を示します。実際の使用においては、以下の点に留意する必要があります。 - ルーティング キーは、「.」で区切られた複数の単語の形式にする必要があります。
- 「#」は複数の単語に一致し、「*」は単一の単語に一致します。
- キューは複数のルーティング キーにバインドできます。
- 交換はどのキューでも一致するものを見つけられない場合、メッセージを破棄します。
|