|
RabbitMQを初めて知った時、すぐにGitHubで関連チュートリアルを探したのですが、残念ながら見つかりませんでした。SpringやMybatisのチュートリアルはたくさんあるのに、RabbitMQのチュートリアルはほとんどありませんでした。そこで、自分でまとめてみようと思いました。何か間違いがあれば、ぜひご指摘ください。
この記事は主にGitHubに公開しているソースコードについて解説しているため、ソースコード自体はあまり多くありません。執筆にかなり時間がかかったため、迷子にならないよう、お気軽にスターを付けたりフォークしたりしてください。 GitHub アドレス: https://github.com/erlieStar/rabbitmq-examples 序文 RabbitMQ でのメッセージの流れを見てみましょう。 図示する主なプロセスは次のとおりです。
プロセス全体は主に4つの参加者、つまりメッセージ、エクスチェンジ、キュー、そしてコンシューマーによって構成されます。これら4つの参加者について見ていきましょう。 メッセージ メッセージには一連の属性を設定できます。各属性の目的については、「Deep Dive into RabbitMQ」という書籍に記載されています。 交換 メッセージを受信し、ルーティングキーに従ってバインドされたキューに転送します。よく使用される属性は次のとおりです。 最も頻繁に使用されるプロパティは type 属性です。これについては以下で詳しく説明します。 ファンアウト交換 このエクスチェンジに送信されたメッセージは、このエクスチェンジにバインドされているすべてのキューにルーティングされ、ブロードキャストに使用できます。 ルーティング キーを処理せずに、キューを取引所にバインドするだけです。 ファンアウト スイッチは、メッセージの転送が最も高速です。 直接交換 BindingKey と RoutingKey が完全に一致するキューにメッセージをルーティングします。 トピック交換 前述の通り、直接型交換のルーティングルールは、RoutingKeyとBindingKeyの完全一致です。トピック交換は直接型交換と同様に、RoutingKeyとBindingKeyが一致するキューにメッセージを送信しますが、あいまい一致も許可します。
ルーティング キー java.lang と java.util.concurrent を持つ 2 つのメッセージがある場合、java.lang は Consumer1 と Consumer2 にルーティングされ、java.util.concurrent は Consumer2 にルーティングされます。 ヘッダー交換 ヘッダー型交換では、ルートキーのマッチングルールに依存せず、送信メッセージコンテンツのヘッダー属性に基づいてメッセージをマッチングします。ヘッダー型交換は非効率的で実用的ではなく、ほとんど使用されません。 列 キューの共通プロパティは次のとおりです。 キューの引数に設定できる一般的なパラメータは次のとおりです。 RabbitMQ API (RabbitMQ API の使用) 第1章: クイックスタート - RabbitMQプロデューサーとコンシューマープログラムを手動で作成する 第 2 章: さまざまな Exchange メソッドの使用方法を説明します。 上で説明したさまざまな Exchange マシン ルーティング ルールを確認しましょう。 第3章: メッセージの取得 メッセージを取得する方法は 2 つあります。
では、メッセージをプルするべきでしょうか、それともプッシュするべきでしょうか?GETはポーリングモデルを使用し、GETはプッシュモデルを使用します。GETモデルではメッセージごとにオーバーヘッドが発生し、RabbitMQとの同期通信が必要になります。このリクエストは、クライアントアプリケーションがリクエストフレームを送信し、RabbitMQがレスポンスを送信するという構成になります。したがって、プルを避けるにはメッセージをプッシュする方がよいでしょう。 第4章: 手動確認 メッセージを確認する方法は2つあります。
コンシューマーは、メッセージを消費するときに autoAck パラメータを指定できます。 文字列 basicConsume(文字列キュー、ブール値 autoAck、コンシューマーコールバック) autoAck=false: RabbitMQ は、メモリ (またはディスク) からメッセージを削除する前に、コンシューマーが確認メッセージを表示するまで待機します。 `autoAck=true`: RabbitMQ は、コンシューマーが実際にメッセージを消費したかどうかに関係なく、送信されたメッセージを自動的に確認し、メモリ (またはディスク) から削除します。 手動確認方法は次の通りで、2 つのパラメータが含まれます。 basicAck(長い配信タグ、ブール値の複数) `deliveryTag`: チャネルで配信されたメッセージを識別するために使用されます。RabbitMQがConsumerにメッセージをプッシュする際、`deliveryTag`が付加されます。これにより、Consumerはメッセージの確認時にどのメッセージが確認されたかをRabbitMQに伝えることができます。 RabbitMQ は、各チャネルで各メッセージの deliveryTag が 1 から増加することを保証します。 `multiple=true`: メッセージ ID <= deliveryTag のメッセージが確認されます。 `myltiple=false`: メッセージ id=deliveryTag のメッセージが確認されます。 メッセージが未確認のままだとどうなるのでしょうか? キュー内のメッセージがコンシューマーに送信され、コンシューマーがメッセージを確認しない場合は、確認されてから削除されるまで、メッセージはキュー内に残ります。 コンシューマー A に送信されたメッセージが未確認のままの場合、RabbitMQ は、コンシューマー A と RabbitMQ 間の接続が切断された後にのみ、コンシューマー A から別のコンシューマーに未確認メッセージを再配信することを検討します。 第5章: メッセージを拒否する2つの方法 メッセージを確認する方法は 1 つだけです。 basicAck(長い配信タグ、ブール値の複数) メッセージを拒否する方法は 2 つあります。
basicNack と basicReject の唯一の違いは、basicNack がバッチ拒否をサポートしていることです。 deliveryTag と複数のパラメータについては、すでに説明しました。 `requeue=true`: メッセージはキューに再度送信されます。 `requeue=false`: メッセージは失われます。 第6章: 障害通知 第 6 章から第 10 章では、主に情報の公開に伴うトレードオフについて概説します。 最もよく使用されるのは、失敗通知と発行者の確認です。 特定のキューにルーティングできないメッセージを取得するにはどうすればよいですか?
`mandatory` は `channel.basicPublish()` メソッドのパラメータです。 `mandatory=true`: エクスチェンジがルーティング キーに基づいて一致するキューを見つけられない場合、RabbitMQ は `Basic.Return` コマンドを呼び出してメッセージをプロデューサーに返します。 必須 = false: 上記の状況では、メッセージは直接破棄されます。 第7章 発行者の確認 メッセージが送信された後、そのメッセージは実際に取引所に届きますか?デフォルトでは、プロデューサーはメッセージが取引所に届いたかどうかはわかりません。 RabbitMQ はこの問題に対して 2 つの解決策を提供します。
出版社は、プログラミング方法が 3 つあることを確認しました。
非同期確認モードは最も高いパフォーマンスを提供するため、頻繁に使用されます。これについて詳しく説明したいと思います。
非同期確認コードを書いたことがある方なら、このコードには馴染みがあるはずです。deliveryTag と multiple が使われているのがお分かりいただけるでしょう。ただし、ここでの deliveryTag と multiple は、メッセージの ack とは全く関係がないことを明確にしておきます。 `confirmListener` の `ack` は RabbitMQ によって制御され、メッセージが交換に到達したかどうかを確認するために使用されます。 メッセージ確認応答(ACK):前述の通り、確認応答は自動または手動で行うことができます。これは、キュー内のメッセージがコンシューマーによって消費されたかどうかを確認するために使用されます。 第8章: バックアップスイッチ プロデューサーがメッセージを送信する際に「mandatory」パラメータを設定していない場合、メッセージがキューにルーティングされないと失われます。「mandatory」パラメータが設定されている場合は、ReturnListenerロジックを追加する必要があり、プロデューサーのコードが複雑になります。複雑さとメッセージの損失の両方を回避するには、スタンバイエクスチェンジを使用できます。スタンバイエクスチェンジは、ルーティングされていないメッセージをRabbitMQに保存し、必要に応じて処理します。 第9章 出来事 RabbitMQ には、トランザクション メカニズムに関連する 3 つのメソッドがあります。 メッセージが RabbitMQ 交換に正常に送信された場合にのみ、トランザクションは正常にコミットされます。それ以外の場合は、例外をキャッチした後にトランザクションのロールバックを実行し、同時にメッセージを再送信することができます。 トランザクションは RabbitMQ のパフォーマンスに重大な影響を与える可能性があるため、通常は代わりにパブリッシャー確認が使用されます。 第10章: メッセージの永続性 メッセージを永続化するには、メッセージの delivery-mode 属性を 2 に設定するだけです。 RabbitMQ はこのプロパティを MessageProperties.PERSISTENT_TEXT_PLAIN としてカプセル化します。 詳しい使用方法については、GitHub のコードを参照してください。 メッセージを永続化したい場合は、キューとメッセージの永続化の両方を設定するのが最適です。キューの永続化のみを設定した場合、再起動後にメッセージが失われます。同様に、キューの永続化のみを設定した場合、再起動時にキューが消えてしまい、結果としてメッセージも失われます。 第11章: デッドレターキュー DLX(Dead-Letter-Exchange)は、デッドレターメッセージを処理する交換プロトコルの一種です。メッセージがキュー内でデッドメッセージになると、別の交換プロトコル(DLX)に再送信できます。DLXにバインドされたキューは、デッドレターキューと呼ばれます。 DLX も通常の交換であり、通常の交換と変わりません。基本的には、特定のキューの属性を設定することを伴います。 メッセージがデッドレターになる主な理由は次のとおりです。
デッドレタースイッチとスタンバイスイッチの違い スタンバイ スイッチ: 1. メッセージをルーティングできない場合、メッセージはスタンバイ スイッチに転送されます。2. スタンバイ スイッチは、プライマリ スイッチが宣言されるときに定義されます。 デッドレター交換: 1. キューに到着したがコンシューマーによって拒否されたメッセージは、デッドレター交換に転送されます。2. デッドレター交換は、キューが宣言されるときに定義されます。 第12章 トラフィック制御(サービス品質保証) QoS、つまりサーバー側のレート制限は、プルベースの消費方法では効果がありません。 QoS を使用するには、次の 2 つの手順のみが必要です。
basicQos(int prefetchSize, int prefetchCount, boolean global) QoS を使用する理由は何ですか?
プッシュ通知が大量に流入すると速度が低下したり、デバイスが使用できなくなる可能性もあるため、サーバー側のレート制限が重要になります。
これにより、一部のコンシューマーがメッセージの処理を完了している一方で、他のコンシューマーにはメッセージがバックログとして残っており、全体的なアプリケーション スループットが低下する可能性があります。 |