DUICUO

RabbitMQ に関する知識の包括的な要約 – もう RabbitMQ を知らないと言う必要はありません。

RabbitMQを初めて知った時、すぐにGitHubで関連チュートリアルを探したのですが、残念ながら見つかりませんでした。SpringやMybatisのチュートリアルはたくさんあるのに、RabbitMQのチュートリアルはほとんどありませんでした。そこで、自分でまとめてみようと思いました。何か間違いがあれば、ぜひご指摘ください。

[[312956]]

この記事は主にGitHubに公開しているソースコードについて解説しているため、ソースコード自体はあまり多くありません。執筆にかなり時間がかかったため、迷子にならないよう、お気軽にスターを付けたりフォークしたりしてください。

GitHub アドレス: https://github.com/erlieStar/rabbitmq-examples

序文

RabbitMQ でのメッセージの流れを見てみましょう。

図示する主なプロセスは次のとおりです。

  1. プロデューサーがメッセージを送信するときは、RoutingKey を指定し、その後メッセージは Exchange に送信されます。
  2. Exchange は、一連のルールに基づいてメッセージを指定されたキューにルーティングします。
  3. コンシューマーはキューからメッセージを消費します。

プロセス全体は主に4つの参加者、つまりメッセージ、エクスチェンジ、キュー、そしてコンシューマーによって構成されます。これら4つの参加者について見ていきましょう。

メッセージ

メッセージには一連の属性を設定できます。各属性の目的については、「Deep Dive into RabbitMQ」という書籍に記載されています。

交換

メッセージを受信し、ルーティングキーに従ってバインドされたキューに転送します。よく使用される属性は次のとおりです。

最も頻繁に使用されるプロパティは type 属性です。これについては以下で詳しく説明します。

ファンアウト交換

このエクスチェンジに送信されたメッセージは、このエクスチェンジにバインドされているすべてのキューにルーティングされ、ブロードキャストに使用できます。

ルーティング キーを処理せずに、キューを取引所にバインドするだけです。

ファンアウト スイッチは、メッセージの転送が最も高速です。

直接交換

BindingKey と RoutingKey が完全に一致するキューにメッセージをルーティングします。

トピック交換

前述の通り、直接型交換のルーティングルールは、RoutingKeyとBindingKeyの完全一致です。トピック交換は直接型交換と同様に、RoutingKeyとBindingKeyが一致するキューにメッセージを送信しますが、あいまい一致も許可します。

  1. RoutinKeyは「.」で区切られた文字列です(例:com.rabbitmq.client)
  2. BindingKey と RoutingKey も「.」で区切られた文字列です。
  3. BindKey には、あいまい一致のための 2 つの特殊文字列「*」と「#」を含めることができます。「*」は 1 つの単語に一致しますが、「#」は複数の単語 (0 個と 1 個を含む) に一致します。

ルーティング キー java.lang と java.util.concurrent を持つ 2 つのメッセージがある場合、java.lang は Consumer1 と Consumer2 にルーティングされ、java.util.concurrent は Consumer2 にルーティングされます。

ヘッダー交換

ヘッダー型交換では、ルートキーのマッチングルールに依存せず、送信メッセージコンテンツのヘッダー属性に基づいてメッセージをマッチングします。ヘッダー型交換は非効率的で実用的ではなく、ほとんど使用されません。

キューの共通プロパティは次のとおりです。

キューの引数に設定できる一般的なパラメータは次のとおりです。

RabbitMQ API (RabbitMQ API の使用)

第1章: クイックスタート - RabbitMQプロデューサーとコンシューマープログラムを手動で作成する

第 2 章: さまざまな Exchange メソッドの使用方法を説明します。

上で説明したさまざまな Exchange マシン ルーティング ルールを確認しましょう。

第3章: メッセージの取得

メッセージを取得する方法は 2 つあります。

  1. メッセージを取得
  2. プッシュメッセージ(メッセージの消費)

では、メッセージをプルするべきでしょうか、それともプッシュするべきでしょうか?GETはポーリングモデルを使用し、GETはプッシュモデルを使用します。GETモデルではメッセージごとにオーバーヘッドが発生し、RabbitMQとの同期通信が必要になります。このリクエストは、クライアントアプリケーションがリクエストフレームを送信し、RabbitMQがレスポンスを送信するという構成になります。したがって、プルを避けるにはメッセージをプッシュする方がよいでしょう。

第4章: 手動確認

メッセージを確認する方法は2つあります。

  1. 自動確認 (autoAck=true)
  2. 手動確認 (autoAck=false)

コンシューマーは、メッセージを消費するときに autoAck パラメータを指定できます。

文字列 basicConsume(文字列キュー、ブール値 autoAck、コンシューマーコールバック)

autoAck=false: RabbitMQ は、メモリ (またはディスク) からメッセージを削除する前に、コンシューマーが確認メッセージを表示するまで待機します。

`autoAck=true`: RabbitMQ は、コンシューマーが実際にメッセージを消費したかどうかに関係なく、送信されたメッセージを自動的に確認し、メモリ (またはディスク) から削除します。

手動確認方法は次の通りで、2 つのパラメータが含まれます。

basicAck(長い配信タグ、ブール値の複数)

`deliveryTag`: チャネルで配信されたメッセージを識別するために使用されます。RabbitMQがConsumerにメッセージをプッシュする際、`deliveryTag`が付加されます。これにより、Consumerはメッセージの確認時にどのメッセージが確認されたかをRabbitMQに伝えることができます。

RabbitMQ は、各チャネルで各メッセージの deliveryTag が 1 から増加することを保証します。

`multiple=true`: メッセージ ID <= deliveryTag のメッセージが確認されます。

`myltiple=false`: メッセージ id=deliveryTag のメッセージが確認されます。

メッセージが未確認のままだとどうなるのでしょうか?

キュー内のメッセージがコンシューマーに送信され、コンシューマーがメッセージを確認しない場合は、確認されてから削除されるまで、メッセージはキュー内に残ります。

コンシューマー A に送信されたメッセージが未確認のままの場合、RabbitMQ は、コンシューマー A と RabbitMQ 間の接続が切断された後にのみ、コンシューマー A から別のコンシューマーに未確認メッセージを再配信することを検討します。

第5章: メッセージを拒否する2つの方法

メッセージを確認する方法は 1 つだけです。

basicAck(長い配信タグ、ブール値の複数)

メッセージを拒否する方法は 2 つあります。

  1. basicNack(長い配信タグ、ブール値の複数、ブール値の再キュー)
  2. basicReject(長い配信タグ、ブール値の再キュー)

basicNack と basicReject の唯一の違いは、basicNack がバッチ拒否をサポートしていることです。

deliveryTag と複数のパラメータについては、すでに説明しました。

`requeue=true`: メッセージはキューに再度送信されます。

`requeue=false`: メッセージは失われます。

第6章: 障害通知

第 6 章から第 10 章では、主に情報の公開に伴うトレードオフについて概説します。

最もよく使用されるのは、失敗通知と発行者の確認です。

特定のキューにルーティングできないメッセージを取得するにはどうすればよいですか?

  1. メッセージを送信するときに、mandatory を true に設定します。
  2. プロデューサーは、channel.addReturnListener を呼び出して ReturnListener リスナーを追加し、キューにルーティングされていないメッセージを取得できます。

`mandatory` は `channel.basicPublish()` メソッドのパラメータです。

`mandatory=true`: エクスチェンジがルーティング キーに基づいて一致するキューを見つけられない場合、RabbitMQ は `Basic.Return` コマンドを呼び出してメッセージをプロデューサーに返します。

必須 = false: 上記の状況では、メッセージは直接破棄されます。

第7章 発行者の確認

メッセージが送信された後、そのメッセージは実際に取引所に届きますか?デフォルトでは、プロデューサーはメッセージが取引所に届いたかどうかはわかりません。

RabbitMQ はこの問題に対して 2 つの解決策を提供します。

  1. トランザクション(後述)
  2. 出版社が確認

出版社は、プログラミング方法が 3 つあることを確認しました。

  1. 通常の確認モード:各メッセージを送信した後、waitForConfirms() メソッドが呼び出され、サーバーからの確認を待機します。これは基本的に、シリアルな確認プロセスです。
  2. バッチ確認モード: 各バッチのメッセージを送信した後、waitForConfirms() メソッドを呼び出してサーバーの確認を待機します。
  3. 非同期確認モード: サーバーが 1 つ以上のメッセージを確認した後にクライアントがコールバックするコールバック メソッドを提供します。

非同期確認モードは最も高いパフォーマンスを提供するため、頻繁に使用されます。これについて詳しく説明したいと思います。

  1. channel.addConfirmListener(newConfirmListener(){@OverridepublicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{log.info( "handleAck,deliveryTag:{},multiple:{}" ,deliveryTag,multiple);}@OverridepublicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{log.info( "handleNack,deliveryTag:{},multiple:{}" ,deliveryTag,multiple);}});

非同期確認コードを書いたことがある方なら、このコードには馴染みがあるはずです。deliveryTag と multiple が使われているのがお分かりいただけるでしょう。ただし、ここでの deliveryTag と multiple は、メッセージの ack とは全く関係がないことを明確にしておきます。

`confirmListener` の `ack` は RabbitMQ によって制御され、メッセージが交換に到達したかどうかを確認するために使用されます。

メッセージ確認応答(ACK):前述の通り、確認応答は自動または手動で行うことができます。これは、キュー内のメッセージがコンシューマーによって消費されたかどうかを確認するために使用されます。

第8章: バックアップスイッチ

プロデューサーがメッセージを送信する際に「mandatory」パラメータを設定していない場合、メッセージがキューにルーティングされないと失われます。「mandatory」パラメータが設定されている場合は、ReturnListenerロジックを追加する必要があり、プロデューサーのコードが複雑になります。複雑さとメッセージの損失の両方を回避するには、スタンバイエクスチェンジを使用できます。スタンバイエクスチェンジは、ルーティングされていないメッセージをRabbitMQに保存し、必要に応じて処理します。

第9章 出来事

RabbitMQ には、トランザクション メカニズムに関連する 3 つのメソッドがあります。

メッセージが RabbitMQ 交換に正常に送信された場合にのみ、トランザクションは正常にコミットされます。それ以外の場合は、例外をキャッチした後にトランザクションのロールバックを実行し、同時にメッセージを再送信することができます。

トランザクションは RabbitMQ のパフォーマンスに重大な影響を与える可能性があるため、通常は代わりにパブリッシャー確認が使用されます。

第10章: メッセージの永続性

メッセージを永続化するには、メッセージの delivery-mode 属性を 2 に設定するだけです。

RabbitMQ はこのプロパティを MessageProperties.PERSISTENT_TEXT_PLAIN としてカプセル化します。

詳しい使用方法については、GitHub のコードを参照してください。

メッセージを永続化したい場合は、キューとメッセージの永続化の両方を設定するのが最適です。キューの永続化のみを設定した場合、再起動後にメッセージが失われます。同様に、キューの永続化のみを設定した場合、再起動時にキューが消えてしまい、結果としてメッセージも失われます。

第11章: デッドレターキュー

DLX(Dead-Letter-Exchange)は、デッドレターメッセージを処理する交換プロトコルの一種です。メッセージがキュー内でデッドメッセージになると、別の交換プロトコル(DLX)に再送信できます。DLXにバインドされたキューは、デッドレターキューと呼ばれます。

DLX も通常の交換であり、通常の交換と変わりません。基本的には、特定のキューの属性を設定することを伴います。

メッセージがデッドレターになる主な理由は次のとおりです。

  1. メッセージは拒否され (Basic.Reject/Basic.Nack)、再配信されません (requeue=false)。
  2. メッセージの有効期限が切れました
  3. キューが最大長に達しました。

デッドレタースイッチとスタンバイスイッチの違い

スタンバイ スイッチ: 1. メッセージをルーティングできない場合、メッセージはスタンバイ スイッチに転送されます。2. スタンバイ スイッチは、プライマリ スイッチが宣言されるときに定義されます。

デッドレター交換: 1. キューに到着したがコンシューマーによって拒否されたメッセージは、デッドレター交換に転送されます。2. デッドレター交換は、キューが宣言されるときに定義されます。

第12章 トラフィック制御(サービス品質保証)

QoS、つまりサーバー側のレート制限は、プルベースの消費方法では効果がありません。

QoS を使用するには、次の 2 つの手順のみが必要です。

  1. autoAck を false に設定する (autoAck=true は効果がありません)
  2. basicConsume メソッドを呼び出す前に、まず 3 つのパラメータを持つ basicQos メソッドを呼び出す必要があります。

basicQos(int prefetchSize, int prefetchCount, boolean global)

QoS を使用する理由は何ですか?

  • サービスの安定性を向上させます。コンシューマーが一定期間利用できず、キューに数万件もの未処理メッセージが残っているとします。クライアントを再起動すると…

プッシュ通知が大量に流入すると速度が低下したり、デバイスが使用できなくなる可能性もあるため、サーバー側のレート制限が重要になります。

  • スループットを向上させるため、キューに複数のコンシューマーがある場合、キューで受信したメッセージはラウンドロビン方式で各コンシューマーに送信されます。ただし、マシンの性能などの要因により、各コンシューマーの処理能力は異なります。

これにより、一部のコンシューマーがメッセージの処理を完了している一方で、他のコンシューマーにはメッセージがバックログとして残っており、全体的なアプリケーション スループットが低下する可能性があります。