DUICUO

Kafka の基礎をご案内します。知れば知るほど、知らないことが増えることに気づきます。

[[340900]]

この記事は、蔡歩才氏が執筆したWeChat公式アカウント「小蔡良基」から転載したものです。転載の許可については、小蔡良基公式アカウントまでお問い合わせください。

初期のカフカ

1. はじめに

Kafkaは元々、LinkedInによってScalaを用いて開発され、複数のパーティションとレプリカを備え、ZooKeeperによって調整される分散メッセージングシステムでした。その後、Apache Software Foundationに寄贈されました。現在、Kafkaは分散ストリーム処理プラットフォームとして位置付けられており、高いスループット、永続性、水平スケーラビリティ、そしてストリーミングデータ処理のサポートから広く利用されています。

2. 使用シナリオ

メッセージングシステム:Kafka と従来のメッセージングシステム(メッセージミドルウェア)はどちらも、システムの分離、冗長ストレージ、トラフィックシェーピング、バッファリング、非同期通信、スケーラビリティ、リカバリ性といった機能を提供します。さらに、Kafka は、他のほとんどのメッセージングシステムでは実現が難しい、メッセージの順序保証とバックトラッキングによる消費機能も提供します。

ストレージシステム:Kafka はメッセージをディスクに永続化するため、他のメモリベースのストレージシステムと比較してデータ損失のリスクを効果的に低減します。Kafka のメッセージ永続性とマルチレプリカメカニズムにより、対応するデータ保持ポリシーを「永続」に設定するか、トピックログの圧縮を有効にするだけで、Kafka を長期データストレージシステムとして使用できます。

ストリーミング プラットフォーム: Kafka は、あらゆる一般的なストリーミング フレームワークに信頼性の高いデータ ソースを提供するだけでなく、ウィンドウ化、結合、交換、集約などのさまざまな操作を含む完全なストリーミング ライブラリも提供します。

3. 基本概念

Kafka アーキテクチャは、複数の「プロデューサー」、「ブローカー」、「コンシューマー」、および ZooKeeper クラスターで構成されます。

  • ZooKeeper は、Kafka によってクラスター メタデータの管理、コントローラーの選択、その他の操作の実行に使用されます。
  • プロデューサー: メッセージを送信する側。メッセージを作成し、Kafka に配信する責任を負います。
  • コンシューマー: コンシューマーはメッセージを受信する側です。Kafkaに接続した後、メッセージを受信し、対応するビジネスロジック処理を実行します。
  • ブローカー: サービスブローカーノード。Kafka では、ブローカーはスタンドアロンの Kafka サービスノードまたは Kafka サービスインスタンスとして簡単に考えることができます。多くの場合、ブローカーは Kafka サーバーとも考えられますが、そのサーバーにデプロイされている Kafka インスタンスは 1 つだけです。1 つ以上のブローカーが Kafka クラスターを形成します。

Kafka システム全体は、おおよそ上記の部分で構成されています。さらに、トピックとパーティションという2つの特に重要な概念があります。

  • トピック: Kafka 内のメッセージはトピックによって分類されます。プロデューサーは特定のトピックにメッセージを送信する役割を担い(Kafka クラスターに送信される各メッセージはトピックを指定する必要があります)、コンシューマーはトピックをサブスクライブして消費する役割を担います。
  • パーティション:トピックは論理的な概念です。さらに複数のパーティションに分割できます。パーティションは単一のトピックにのみ属し、多くの場合トピックパーティションと呼ばれます。同じトピックの異なるパーティションには、異なるメッセージが含まれます。ストレージレベルでは、パーティションは追加可能な「ログファイル」と見なすことができます。メッセージがパーティションのログファイルに追加されると、特定のオフセットが割り当てられます。オフセットは、パーティション内のメッセージに一意の識別子です。Kafkaはこれを使用して、パーティション内のメッセージの順序を保証します。ただし、オフセットはパーティションをまたぐことはありません。つまり、Kafkaはトピックの順序ではなく、パーティションの順序を保証します。

Kafka ではパーティションのレプリカ メカニズムが導入されており、レプリカの数を増やすことで災害復旧機能を向上させることができます。

同じパーティション内の異なるレプリカは、同じメッセージを保存します(ただし、レプリカ間でメッセージが常に同一であるとは限りません)。レプリカは「1つのマスター、複数のスレーブ」モデルで動作し、リーダーレプリカが読み取りおよび書き込みリクエストを処理し、フォロワーレプリカはリーダーとメッセージを同期するだけです。レプリカは異なるブローカー上に存在し、リーダーレプリカに障害が発生すると、フォロワーレプリカから新しいリーダーレプリカが選出され、サービスを提供します。

「Kafka は、マルチレプリカ メカニズムを通じて自動フェイルオーバーを実現し、Kafka クラスター内のブローカーに障害が発生した場合でもサービスの可用性を保証します。」

Kafka についてさらに詳しく調べる前に、いくつかの重要な用語を理解する必要があります。

  • AR (割り当てられたレプリカ): パーティション内のすべてのレプリカは総称して AR と呼ばれます。
  • ISR(同期レプリカ):リーダーレプリカ(リーダーレプリカを含む)と一定レベルの同期を維持しているすべてのレプリカがISRを構成します。ISRセットはARセットのサブセットです。メッセージはまずリーダーレプリカに送信され、その後、フォロワーレプリカはリーダーレプリカからメッセージをプルして同期できます。同期期間中、フォロワーレプリカはリーダーレプリカに対して一定の範囲内で遅延します。
  • OSR (同期していないレプリカ): 同期においてリーダー レプリカより大幅に遅れているレプリカ (リーダー レプリカを除く) は OSR を構成します。

上記の関係から、次の式を導き出すことができます: AR = ISR + OSR

  • HW(ハイウォーターマーク):一般的にハイウォーターマークとも呼ばれ、特定のメッセージオフセットを識別するために使用されます。コンシューマーは、このオフセットより前のメッセージのみを取得できます。
  • LEO(LogStartOffset): 次に書き込まれるメッセージのオフセット。

そろそろ皆さんも焦り始めているかもしれませんね。Kafkaって難しすぎる!もっと学習に集中できないの?

焦らず、焦らずに。まずは理論的な知識をしっかり学ぶ必要があります。これはあなたを落胆させる始まりではなく、成長の始まりです!以下では、できるだけ分かりやすい言葉で、あなたを最も深い穴へと導いていきます!

カフカの制作チーム

ご存知の通り、Kafka は高度な意味では分散メッセージキューですが、簡単に言えば単なるメッセージキューです。そして、メッセージキューとは、簡単に言えば、データのプッシュと取得を行うものです。実際、高度な知識を得るには、シンプルな理解で十分です。

では、データはどこから来るのでしょうか?データはプロダクションチームから来ます!プログラミングの観点から見ると、プロダクションチームにはプロデューサーのグループが存在します(あるいはプロデューサーは1人だけの場合もあります)。プロデューサーとは、Kafkaへのメッセージの送信を担当するアプリケーションです。

クライアント側開発

製造プロセスには通常、次の手順が必要です。

  • プロデューサー クライアント パラメータを設定し、応答のプロデューサー インスタンスを作成します。
  • 送信するメッセージを構築する
  • メッセージを送信
  • プロデューサーインスタンスをシャットダウンする

「4つの主要なステップと1つのシャトルで生産上の問題を解決します」

上記のコードは、プロパティ ファイルに 4 つのパラメータを設定することを示しています。

  • `bootstrap.servers`: このパラメータは、プロデューサークライアントがKafkaクラスターに接続するために必要なブローカーアドレスを指定します。形式は `(host1:port1, host2:port2)` で、1つ以上のアドレスをカンマ区切りで設定できます。デフォルト値は "" です。
  • `key.serializer` と `value.serializer`: これらはそれぞれ、キーと値のシリアル化操作に使用するシリアライザーを指定します。これらのパラメータにはデフォルト値はなく、シリアライザーの完全修飾名を指定する必要があります。
  • `client.id`: KafkaProducerに対応するクライアントIDを設定するために使用されます。デフォルト値は "" です。クライアントがこの値を設定していない場合、KafkaProducerは文字列 "producer-" と数字を連結した "producer-1" または "producer-2" という形式の空でない文字列を自動的に生成します。

ProducerRecord は次のように定義されます。

  • トピックとパーティション: これらは、それぞれ、メッセージが送信されるトピックとパーティション番号を表します。
  • headers: メッセージ ヘッダー。必要ない場合は省略できます。
  • key: メッセージのキーを指定するために使用されます。メッセージの付録としてだけでなく、特定のパーティションにメッセージを送信するためのパーティション番号を計算するためにも使用できます。
  • 値: メッセージ本文。通常は空ではありません。空の場合は、特定のメッセージ(「tombstone message(墓石メッセージ)」)であることを示します。
  • timestamp: メッセージのタイムスタンプ。CreateTime と LogAppendTime の2種類があります。前者はメッセージが作成された時刻を示し、後者はメッセージがログファイルに追加された時刻を示します。

上記の操作には、プロデューサーインスタンスの作成とメッセージの構築が含まれます。メッセージの送信には主に3つのモードがあります。

  • ファイア・アンド・フォーゲット。
  • 同期(sync)
  • 非同期(async)

上記で使用した送信方法は「送信して忘れる」アプローチです。つまり、メッセージが正しく到着したかどうかを気にせず、Kafka に送信するだけです。ほとんどの場合、この方法は問題なく動作しますが、場合によっては(再試行されない例外などにより)、メッセージが失われることがあります。「この方法は最高のパフォーマンスを提供しますが、信頼性は最も低くなります。」

  1. パブリックFuture<RecordMetadata> send(ProducerRecord<K,V> レコード) {}

send メソッドは Future オブジェクトを返します。

  1. 将来の res = producer.send(record);

これは、`send()` メソッドが本質的に非同期であることを示しています。`send()` によって返される `Future` オブジェクトにより、呼び出し元は送信結果を後で取得できます。同期処理を実現したい場合は、`Future` の `get()` メソッドを直接呼び出すことができます。

  1. 試す {
  2. プロデューサー.send(レコード).get();
  3. } キャッチ (例外 e) {
  4. e.printStackTrace();
  5. }

`get()` メソッドを使用して、メッセージが正常に送信されるか例外が発生するまで、Kafka からの応答をブロックして待機します。

生産を非同期にすることはできますか?

Kafka では、 send() メソッドに別のオーバーロードがあります。

  1. public Future<RecordMetadata> send(ProducerRecord<K,V> レコード、コールバック コールバック) {}
  1. プロデューサー.send(レコード、新しいコールバック() {
  2. @オーバーライド
  3. パブリックvoid onCompletion(RecordMetadata recordMetadata, 例外e) {
  4. if ( Objects.isNull (e)) {
  5. System.out.println ( "トピック:" + recordMetadata.topic());
  6. }それ以外{
  7. システム.out.println (e.getMessage());
  8. }
  9. }
  10. });

コールバックの使い方は非常にシンプルで分かりやすいです。Kafkaは、メッセージが正常に送信された場合、または例外がスローされた場合に、レスポンスを受け取るとコールバックします。

onCompletion() メソッドでは、2つのパラメータは相互に排他的です。送信が成功した場合、RecordMetadata は空ではなく、Exception は空になります。送信が失敗した場合は、その逆のことが起こります。

製造面でも難しさはあるのでしょうか?

KafkaProducer では、通常、次の 2 種類の例外が発生します。

  • 再試行可能な例外

ネットワーク例外、リーダー利用不可例外、不明トピックまたはパーティション例外、

NotEnoughReplicasException、NotCoordinatorException

  • 再試行不可例外

RecordTooLargeException など

再試行可能な例外については、「retries」パラメータを設定できます。指定された再試行回数内で例外が自動的に回復した場合、例外はスローされません。「retries」パラメータのデフォルト値は0です。設定方法は以下の通りです。

  1. properties.put(ProducerConfig.RETRIES_CONFIG、10);

上記の例は、再試行回数が 10 回であることを意味します。10 回を超えて再試行しても問題が解決しない場合は、例外がスローされます。

RecordTooLargeException などの再試行されない例外は、送信されるメッセージが大きすぎる場合、再試行されずに直接スローされることを意味します。

シリアル化で

プロデューサーは、オブジェクトをネットワーク経由で Kafka に送信する前にシリアライザーを使用してオブジェクトをバイト配列に変換する必要があり、コンシューマーはデシリアライザーを使用して、Kafka から受信したバイト配列を対応するオブジェクトに変換する必要があります。

上記のコードで使用されている StringSerializer は、Serializer インターフェイスを実装します。

configure() メソッドは現在のクラスを構成するために使用され、serialize() メソッドはシリアル化操作を実行するために使用されます。

「プロデューサーが使用するシリアライザーとコンシューマーが使用するデシリアライザーは 1 対 1 で対応している必要があります。」

もちろん、Kafka が提供するシリアライザーを使用するだけでなく、独自のシリアライザーを定義することもできます。

「学生.クラス」:

  1. @データ
  2. パブリッククラス Student {
  3.  
  4. プライベート文字列;
  5.  
  6. プライベート文字列のコメント;
  7. }

「MySerializer」:

"使用":

  1. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG、MySerializer.class.getName());

必要なのは、独自のシリアライザーをプロパティに配置することだけです。驚くほど簡単です。

パーティショナーとは何ですか?

send() メソッドを介してブローカーにメッセージを送信するプロセス中に、メッセージは「インターセプター」、「シリアライザー」、および「パーティショナー」を通過する場合があります。

「インターセプター」は必須ではありませんが、「シリアライザー」は必須です。シリアライザーを通過した後、送信先のパーティションを決定する必要があります。ProducerRecordメッセージでpartitionフィールドが指定されている場合、「パーティショナー」の役割は必要ありません。partitionはメッセージの送信先のパーティション番号を表すためです。

  1. パッケージ org.apache.kafka.clients.producer;
  2.  
  3. パブリックインターフェースPartitionerはConfigurableとCloseableを拡張します{
  4. intパーティション(文字列トピック、オブジェクトキー、byte[] keyBytes、オブジェクト値、byte[] valueBytes、クラスタークラスター);
  5. void close ();
  6. }

上記はKafkaのPartitionerインターフェースです。パーティション番号を計算し、整数値を返すメソッド「partition()」があることがわかります。6つのパラメータは以下のとおりです。

  1. トピック: テーマ
  2. キー: キー
  3. keyBytes: シリアル化されたキー
  4. 値: 値
  5. valueBytes: シリアル化された値
  6. クラスター: クラスターのメタデータ情報

partition() メソッドは、パーティション割り当てのメインロジックを定義します。キーが空でない場合、デフォルトのパーティショナーはキーに対してハッシュ演算(MurmurHash2 アルゴリズムを使用)を実行し、最終的にハッシュ値に基づいてパーティション番号を計算します。同じキーを持つメッセージは同じパーティションに書き込まれます。キーが空の場合、メッセージはトピック内の利用可能な各パーティションにラウンドロビン方式で送信されます。

キーがnullでない場合、計算されるパーティション番号はすべてのパーティションのいずれかになります。キーが空の場合、計算されるパーティション番号は利用可能なパーティションのいずれかになります。

もちろん、パーティショナーは次のようにカスタマイズすることもできます。

「MyPartitioner.クラス」:

"使用":

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG、MyPartitioner.class.getName());

カスタム パーティショナーも使いやすく、Partitioner インターフェイスを実装するだけで済みます。

迎撃機はここにいますか?

Web 開発者はインターセプターをよく知っているでしょう。Kafka にもインターセプター機能があり、これはさらに「プロデューサー インターセプター」と「コンシューマー インターセプター」に分かれています。

プロデューサー インターセプターは、特定のルールに従って要件を満たさないメッセージをフィルタリングしたり、メッセージの内容を変更したりするなど、メッセージ送信前の準備作業を実行できます。また、コールバック ロジックを送信する前に、カスタマイズされた要件を実行するためにも使用できます。

したがって、必要に応じてカスタマイズが行われます。カスタムインターセプターを作成する場合は、ProducerInterceptorインターフェースを実装するだけで済みます。

  1. パッケージ org.apache.kafka.clients.producer;
  2.  
  3. パブリックインターフェース ProducerInterceptor <K, V> は Configurable を拡張します {
  4. プロデューサー レコード <K,V> onSend(プロデューサー レコード <K,V> プロデューサー レコード);
  5.  
  6. void onAcknowledgement(RecordMetadata recordMetadata、例外 e);
  7.  
  8. void close ();
  9. }

onSend() メソッドを使用すると、メッセージに対するカスタマイズされた操作が可能になります。一方、onAcknowledgement() メソッドは、メッセージの送信が失敗する前、またはメッセージが確認される前に呼び出され、ユーザー定義のコールバックよりも優先されます。

カスタム インターセプターは次のとおりです: MyProducerInterceptor.class:

`onSend()` メソッドでは、送信するメッセージを変更しました。`onAcknowledgement()` メソッドでは、送信成功回数と送信失敗回数をカウントしました。最後に、`close()` メソッドでは、送信成功回数と送信失敗回数を出力しています。

同じ使用方法:

  1. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG、MyProducerInterceptor.class.getName());

インターセプターは自然にインターセプターチェーンを形成します。複数のカスタムインターセプターを定義し、それらをプロパティファイルで宣言することができます。

  1. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG、MyProducerInterceptor1.class.getName() + "," + MyProducerInterceptor2.class.getName());

「このように、次の迎撃機は前の迎撃機の出力に依存することになります。」

重要なパラメータ

すでに述べたパラメータに加えて、他にも重要なパラメータがいくつかあります。

1. ack

このパラメータは、プロデューサーがこのメッセージを有効と判断する前に、パーティション内のレプリカがいくつこのメッセージを受信する必要があるかを指定します。

  1. properties.put(ProducerConfig.ACKSCONFIG,"0"); // 文字列型であることに注意してください

メッセージは正常に書き込まれました。ack値には3種類の文字列値が含まれています。

  1. `acks = 1`: デフォルト値は1です。プロデューサーがメッセージを送信した後、パーティションのリーダーレプリカがメッセージを正しく書き込む限り、サーバーから成功応答を受け取ります。メッセージがリーダーレプリカに書き込まれ、プロデューサーに成功応答を返したとしても、他のフォロワーレプリカがメッセージをプルする前にリーダーレプリカがクラッシュした場合、メッセージは失われます。
  2. `acks = 0`:プロデューサーはメッセージを送信した後、サーバーからの応答を待つ必要がありません。メッセージの送信からKafkaへの書き込みまでの伝送中に例外が発生し、Kafkaがメッセージを受信できない場合、プロデューサーはそのことを知る術がなく、メッセージは失われます。同じ設定環境において、`acks`を0に設定すると、最大のスループットを実現できます。
  3. `acks = -1` または `acks = all`: プロデューサーはメッセージを送信した後、ISR内のすべてのレプリカがメッセージを正しく書き込み、サーバーから正常な応答を受信するまで待機する必要があります。同じ設定環境において、`acks` を 1 または (all) に設定すると、最高の信頼性が得られます。

設定:

  1. properties.put(ProducerConfig.ACKSCONFIG,"0"); // 文字列型であることに注意してください

2. 最大リクエストサイズ

プロデューサークライアントが送信できるメッセージの最大数を制限するために使用されます。デフォルト値は1048576バイト(1MB)です。

3. 再試行

プロデューサーが再試行する回数を設定するために使用されます。デフォルト値は0で、例外が発生しても再試行は行われません。

4. retry.backoff.ms

不必要な頻繁な再試行を回避するために、2 回の再試行間の時間間隔を設定するために使用されます。デフォルト値は 100 です。

5. 接続最大アイドル時間(ms)

このパラメータは、制限された接続が閉じられるまでの時間を指定します。デフォルト値は540000(ミリ秒)、つまり9分です。

6.バッファメモリ

キャッシュされたメッセージのバッファサイズを設定するために使用されます

7. バッチサイズ

再利用可能なメモリ領域のサイズを設定するために使用されます

Kafkaのコンシューマーグループ

生産があれば消費もある、そうでしょう?プロデューサーに対応するのはコンシューマーです。アプリケーションはKafkaConsumerを使ってトピックをサブスクライブし、それらのトピックからメッセージをプルできます。

個人とグループ?

各コンシューマーには対応するコンシューマーグループがあります。コンシューマーは、Kafka 内のトピックをサブスクライブし、それらのトピックからメッセージをプルする役割を担います。メッセージがトピックにパブリッシュされると、そのメッセージは、サブスクライブしている各コンシューマーグループ内の 1 つのコンシューマーにのみ配信されます。

コンシューマー グループにコンシューマーが 1 つだけの場合、状況は次のようになります。

コンシューマー グループに 2 人のコンシューマーがいる場合、状況は次のようになります。

上記の割り当てからわかるように、消費者数の増加に伴い、全体的な購買力は水平方向に拡張可能です。消費者数を増減することで、全体的な購買力を向上させる(または低下させる)ことができます。しかし、ゾーン数が固定されている場合、消費者数を盲目的に増やしても、購買力を継続的に向上させることはできません。消費者が多すぎると、消費者数がゾーン数を超え、どのゾーンにも割り当てられない消費者が生まれます。

上記の割り当てロジックは、デフォルトのパーティション割り当て戦略に基づいています。コンシューマーとサブスクライブされたトピック間のパーティション割り当て戦略は、コンシューマークライアントのpartition.assignment.strategyを設定することで設定できます。

配送方法

Kafka には 2 つのメッセージ配信モードがあります。

ポイントツーポイントモード

キューベースのシステムでは、メッセージプロデューサーがキューにメッセージを送信し、メッセージコンシューマーがキューからメッセージを受信します。

パブリッシュ/サブスクライブモデル(Pub/Sub)

トピックベースのモデルでは、トピックはメッセージ配信の媒体と考えることができます。メッセージパブリッシャーはトピックにメッセージをパブリッシュし、メッセージサブスクライバーはそのトピックからのメッセージをサブスクライブします。トピックにより、サブスクライバーとパブリッシャーは互いに独立して動作し、直接的な接触なしにメッセージを配信できます。パブリッシュ/サブスクライブパターンは、1対多のメッセージブロードキャストで使用されます。

クライアント側開発

消費プロセスには通常、次の手順が含まれます。

  • コンシューマー クライアント パラメータを構成し、対応するコンシューマー インスタンスを作成します。
  • トピックを購読する
  • メッセージの取得と消費
  • 消費変位を提出する
  • コンシューマーインスタンスを閉じる

ご覧のとおり、コンシューマー パラメータを構成するときに、いくつかのよく知られたパラメータが表示されます。

  • `bootstrap.servers`: タイプミスを防ぐため、`ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG` を使用して、Kafka クラスターへの接続に必要なブローカーアドレスのリストを指定できます。1 つ以上のアドレスをカンマで区切って設定できます。デフォルト値は " " です。
  • `group.id`: タイプミスを防ぐため、コンシューマーが所属するコンシューマーグループの名前を表す `ConsumerConfig.GROUP_ID_CONFIG` を使用できます。デフォルト値は " " です。空に設定すると、例外がスローされます。
  • key.deserializer/value.deserializer: タイプミスを防ぐために、ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG と ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG を使用して、コンシューマー側で実行されるデシリアライズ操作を表すことができます。これらの操作は、プロデューサー側で実行される操作と一致している必要があります。

`client.id`: 誤入力を防ぐため、`ConsumerConfig.CLIENT_ID_CONFIG` で表すことができます。これは、`KafkaConsumer` に対応するクライアントIDを設定するために使用されます。デフォルト値は " " です。

テーマサブスクリプション

コンシューマーがメッセージを消費するには、対応するトピックをサブスクライブすることが重要です。上記の例では、`consumer.subscribe(Arrays.asList(topic));` を使用してトピックをサブスクライブしており、コンシューマーが1つ以上のトピックをサブスクライブできることを示しています。`subscribe()` メソッドのオーバーロードを見てみましょう。

  1. public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { /* コンパイルされたコード */ }
  2.  
  3. public void subscribe(Collection<String> topics) { /* コンパイルされたコード */ }
  4.  
  5. public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { /* コンパイルされたコード */ }
  6.  
  7. public void subscribe(Pattern pattern) { /* コンパイルされたコード */ }

トピックをサブスクライブするプロセス中に次の状況が発生した場合:

  1. consumer.subscribe(Arrays.asList(topic1));
  2. consumer.subscribe(Arrays.asList(topic2));

最終的には、topic1 ではなく topic2 のみがサブスクライブされ、topic1 と topic2 の組み合わせはサブスクライブされなくなります。

オーバーロードされた subscribe() メソッドは正規表現もサポートします。

  1. consumer.subscribe(Pattern.compile("topic.*"));

この構成では、誰かが新しいトピックを作成し、そのトピック名が正規表現と一致する場合、コンシューマーは新しく追加されたトピックからのメッセージを消費できます。

subscribe() メソッドは、トピックと正規表現をパラメータとして渡すだけでなく、対応する再バランス リスナーを設定するために使用される ConsumerRebalanceListener パラメータを他の 2 つのメソッドに渡すこともサポートしています。

subscribe() メソッドを使用してトピックをサブスクライブするだけでなく、コンシューマーは assign() メソッドを使用して特定のトピックの特定のセクションを直接サブスクライブすることもできます。

  1. パブリックvoid assign(コレクション<TopicPartition> パーティション)

TopicPartition オブジェクトは次のように定義されます。

コンストラクターには、入力として「サブスクライブされたトピック」と「パーティション番号」が必要です。次のように使用されます。

  1. コンシューマーにArrays.asList(新しいTopicPartition("kafka-demo", 0))を割り当てます。

このようにして、kafka-demo のパーティション 0 をサブスクライブできます。

トピックにいくつのパーティションがあるか事前にわからない場合はどうすればよいでしょうか?KafkaConsumer の `partitionsFor()` メソッドを使用すると、指定したトピックのメタデータ情報を照会できます。`partitionsFor()` メソッドは次のように定義されています。

  1. パブリックリスト <PartitionInfo> パーティション For(String topic);

PartitionInfo オブジェクトは次のように定義されます。

  1. パブリッククラスPartitioninfo {
  2. private final String topic; // トピック名
  3. private final int partition; // パーティション番号
  4. private final Node leader; // パーティションのリーダーレプリカの場所です。
  5. private final Node[] replicas; // パーティションのARコレクション
  6. private final Node[] inSyncReplicas; // パーティションの ISR コレクション。
  7. private final Node[] offlineReplicas; // パーティションのOSRコレクション
  8. }

サブスクリプションは悪意を持ってバンドルされているわけではありません。サブスクライブとアンサブスクライブは自由に行えます。トピックのサブスクライブを解除するには、`KafkaConsumer` の `unsubscribe()` メソッドを使用します。このメソッドは、`subscribe(Collection)`、`subscribe(Pattern)`、`assign(Collection)` メソッドを使用して実装されたサブスクリプションのサブスクライブを解除できます。

  1. 消費者.購読解除();

`subscribe(Collection)` または `assign(Collection)` の collection パラメータに空のコレクションを設定すると、`unsubscribe()` メソッドと同じ効果が得られます。以下の例の3行のコードは、いずれも同じ結果になります。

  1. 消費者.購読解除();
  2. consumer.subscribe(新しいArrayList<String>());
  3. コンシューマーに新しいArrayList<TopicPartition>()を割り当てます。

消費パターン

一般的に、メッセージの消費パターンには「プッシュ」と「プル」の2種類があります。Kafka の消費は「プル」パターンに基づいています。

プッシュ モード: サーバーはメッセージをコンシューマーにプロアクティブにプッシュします。

プル モード: コンシューマーがサーバーへのプル リクエストを積極的に開始します。

Kafka のメッセージ消費は継続的なポーリングプロセスです。コンシューマー側で行う必要があるのは、poll() メソッドを繰り返し呼び出すことだけです。一部のパーティションに消費可能なメッセージがない場合、そのパーティションのメッセージ取得結果は空になります。また、サブスクライブされているすべてのパーティションに消費可能なメッセージがない場合、poll() メソッドは空のメッセージセットを返します。

  1. public ConsumerRecords<K, V> ポーリング(最終期間タイムアウト)

poll() メソッドにはタイムアウトパラメータを渡すことで、poll() メソッドのブロック時間を制御できます。コンシューマーのバッファに利用可能なデータがない場合、poll() メソッドはブロックされます。

poll() メソッドを使用して取得されるメッセージは、次のように定義される ConsumerRecord オブジェクトです。

メッセージを消費するときに、ConsumerRecord 内の関心のあるフィールドに対して特定のビジネス ロジック処理を直接実行できます。

消費者インターセプター

プロデューサーインターセプターの使用については既に説明しましたが、当然のことながら、コンシューマーにも独自のインターセプターがあります。コンシューマーインターセプターは主に、メッセージをコンシュームする際、またはコンシュームオフセットをコミットする際に、カスタマイズされた操作を実行します。

プロデューサーは `ProducerInterceptor` インターフェースを実装することでインターセプターを定義し、コンシューマーは `ConsumerInterceptor` インターフェースを実装することでインターセプターを定義します。`ConsumerInterceptor` インターフェースは以下のように定義されます。

  1. パッケージ org.apache.kafka.clients.consumer;
  2.  
  3. パブリックインターフェース ConsumerInterceptor <K, V> は Configurable、AutoCloseable を拡張します {
  4. ConsumerRecords<K,V> を ConsumerRecords<K,V> で消費します。
  5.  
  6. void onCommit(Map<TopicPartition,OffsetAndMetadata> map);
  7.  
  8. void close ();
  9. }
  • `onConsume()`: KafkaConsumer は、`poll()` メソッドが返される前にインターセプターの `onConsume()` メソッドを呼び出し、メッセージに対して対応するカスタマイズされた操作を実行します。これには、返されたメッセージの内容を変更したり、特定のルールに従ってメッセージをフィルタリングしたりすることが含まれます(これにより、`poll()` メソッドによって返されるメッセージの数が減少する可能性があります)。`onConsume()` メソッドで例外がスローされた場合、例外はキャッチされてログに記録されますが、上位に伝播されることはありません。
  • onCommit(): KafkaConsumer は、消費オフセットをコミットした後、インターセプターの onCommit() メソッドを呼び出します。このメソッドは、コミットされたオフセット情報を記録および追跡するために使用できます。例えば、コンシューマーがパラメータなしの commitSync メソッドを使用する場合、コミットされた消費オフセットの詳細はわかりませんが、インターセプターの onCommit() メソッドはそれを行うことができます。

カスタム インターセプターを作成した後も、同じ方法を使用しました。

  1. プロパティを設定します(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG、MyConsumerInterceptor.class.getName());

重要なパラメータ

すでに述べたパラメータに加えて、他にも重要なパラメータがいくつかあります。

1. フェッチする最小バイト数

このパラメータは、コンシューマーが単一のプルリクエスト(poll() メソッドの呼び出し)で Kafka からプルできるデータの最小量を設定します。デフォルト値は 1B です。返されるデータサイズがこのパラメータの値よりも小さい場合、コンシューマーはデータサイズが設定されたサイズに達するまで待機する必要があります。

2. フェッチ最大バイト数

このパラメータは、コンシューマーが単一のプル リクエストで Kafka からプルできるデータの最大量を設定します。デフォルト値は 52,428,800 バイト (50 MB) です。

3. フェッチ最大待機時間(ms)

このパラメータは Kafka の待機時間を指定します。デフォルト値は 500 ミリ秒です。

4. 最大パーティションフェッチバイト

このパラメータは、各パーティションからコンシューマーに返されるデータの最大量を設定します。デフォルト値は 1,048,576 バイト (1 MB) です。

5. 最大ポーリングレコード数

このパラメータは、コンシューマーが 1 回のリクエストで取得できるメッセージの最大数を設定します。デフォルト値は 500 です。

6. リクエストタイムアウト(ミリ秒)

このパラメータは、コンシューマーがリクエスト応答を待機する最大時間を構成します。デフォルト値は 30000 ミリ秒です。

Kafkaトピック管理

プロデューサー側とコンシューマー側の前のセクションで、「トピック」の概念について既に説明しました。「トピック」はKafkaの中核を成すものです。

メッセージの分類であるトピックは、さらに1つ以上のパーティションに細分化できます。パーティションは、メッセージの二次的な分類とも言えます。パーティション化は、Kafka にスケーラビリティと水平スケーリング機能を提供するだけでなく、マルチレプリカメカニズムによるデータ冗長性を提供することでデータの信頼性を向上させます。

1.テーマを作成する

ブローカー側には、auto.create.topics.enable(デフォルト値はtrue)という設定パラメータがあります。このパラメータが「true」の場合、プロデューサーがまだ作成されていないトピックにメッセージを送信しようとすると、パーティション数num.partitions(デフォルト値は1)とレプリケーション係数default.replication.factor(デフォルト値は1)で自動的にトピックが作成されます。

「スクリプトを使用して作成」:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic kafka-demo --partitions 4 --replication-factor 2  

「TopicCommand を使用してトピックを作成する」:

Maven 依存関係をエクスポートします。

  1. <依存関係>
  2. <グループID>org.apache.kafka</グループID>
  3. <アーティファクトID>kafka_2.11</アーティファクトID>
  4. <バージョン>2.0.0</バージョン>
  5. </依存関係>
  1. 公共 静的void createTopic(String topicName) {
  2. 文字列[]オプション = 新しい文字列[]{
  3. "--zookeeper" "localhost:2181/kafka"
  4. " - 作成する"
  5. "--レプリケーション係数" , "2" ,
  6. "--パーティション" , "4" ,
  7. "--topic" , トピック名
  8. };
  9. kafka.admin.TopicCommand.main(オプション);
  10. }

上述示例中,创建了一个分区数为4,副本因子为2 的主题

2. 查看主题

  • -リスト:

通过list指令可以查看当前所有可用的主题:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka -list  
  • 説明する

通过describe指令可以查看单个主题信息,如果不适用--topic 指定主题,则会展示出所有主题的详细信息。--topic还支持指定多个主题:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic kafka-demo1,kafka-demo2  

3.修改主题

当一个主题被创建之后,我们可以对其做一定的修改,比如修改分区个数、修改配置等,借助于alter指令来实现:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic kafka- demo --partitions 3  

修改分区的时候我们需要注意的是:

当主题kafka-demo 的分区数为1 时,不管消息的key 为何值,消息都会发往这一个分区,当分区数增加到3 时,就会根据消息的key 来计算分区号,原本发往分区0 的消息现在就有可能发往分区1 或分区2。因此建议一开始就要设置好分区数量。

目前Kafka 只支持增加分区数而不支持减少分区数,当我们要把主题kafka-demo 的分区数修改为1 时,就会报出InvalidPartitionException 异常。

4. 删除主题

如果确定不再使用一个主题,那么最好的方式就是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。这个时候我们就可以借助delete 指令来删除主题:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic kafka-demo  

需要注意的是我们必须将broker中的delete.topic.enable参数配置为true 才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。

如果要删除的主题是Kafka 的内部主题,那么删除时就会报错。例如:__consumer_offsets和__transaction_state

常见参数

パラメータ名説明
変更する用于修改主题,包括分区数以及主题的配置
config<键值对> 创建或修改主题,用于设置主题级别的参数
作成する创建主题
消去删除主题
delete-config<配置名称> 删除主题级别被覆盖的配置
説明する查看主题的详细信息
disable-rack-aware 创建主题是不考虑机架信息
ヘルプ打印帮助信息
if-exists 修改或删除主题时使用,只有当主题存在时才会执行操作
if-not-exists 创建主题时使用,只有主题不存在时才会执行动作
リスト列出所有可用的主题
partitions<分区数> 创建主题或增加分区时指定分区数
replica-assignment<分配方案> 手工指定分区副本分配方案
replication-factor<副本数> 创建主题时指定副本因子
topic<主题名称> 指定主题名称
topics-with-overrides 使用describe查看主题信息时,只展示包含覆盖配置的主题
指定连接的ZooKeeper 地址信息

上面大致就是Kafka 的入门内容啦,今天的知识就介绍到这里啦,内容虽然不是很深入,但是字数也不少,能完整看完的小伙伴,小菜给你点个赞哦!