|
この記事は、蔡歩才氏が執筆したWeChat公式アカウント「小蔡良基」から転載したものです。転載の許可については、小蔡良基公式アカウントまでお問い合わせください。 初期のカフカ 1. はじめに Kafkaは元々、LinkedInによってScalaを用いて開発され、複数のパーティションとレプリカを備え、ZooKeeperによって調整される分散メッセージングシステムでした。その後、Apache Software Foundationに寄贈されました。現在、Kafkaは分散ストリーム処理プラットフォームとして位置付けられており、高いスループット、永続性、水平スケーラビリティ、そしてストリーミングデータ処理のサポートから広く利用されています。 2. 使用シナリオ メッセージングシステム:Kafka と従来のメッセージングシステム(メッセージミドルウェア)はどちらも、システムの分離、冗長ストレージ、トラフィックシェーピング、バッファリング、非同期通信、スケーラビリティ、リカバリ性といった機能を提供します。さらに、Kafka は、他のほとんどのメッセージングシステムでは実現が難しい、メッセージの順序保証とバックトラッキングによる消費機能も提供します。 ストレージシステム:Kafka はメッセージをディスクに永続化するため、他のメモリベースのストレージシステムと比較してデータ損失のリスクを効果的に低減します。Kafka のメッセージ永続性とマルチレプリカメカニズムにより、対応するデータ保持ポリシーを「永続」に設定するか、トピックログの圧縮を有効にするだけで、Kafka を長期データストレージシステムとして使用できます。 ストリーミング プラットフォーム: Kafka は、あらゆる一般的なストリーミング フレームワークに信頼性の高いデータ ソースを提供するだけでなく、ウィンドウ化、結合、交換、集約などのさまざまな操作を含む完全なストリーミング ライブラリも提供します。 3. 基本概念 Kafka アーキテクチャは、複数の「プロデューサー」、「ブローカー」、「コンシューマー」、および ZooKeeper クラスターで構成されます。
Kafka システム全体は、おおよそ上記の部分で構成されています。さらに、トピックとパーティションという2つの特に重要な概念があります。
Kafka ではパーティションのレプリカ メカニズムが導入されており、レプリカの数を増やすことで災害復旧機能を向上させることができます。 同じパーティション内の異なるレプリカは、同じメッセージを保存します(ただし、レプリカ間でメッセージが常に同一であるとは限りません)。レプリカは「1つのマスター、複数のスレーブ」モデルで動作し、リーダーレプリカが読み取りおよび書き込みリクエストを処理し、フォロワーレプリカはリーダーとメッセージを同期するだけです。レプリカは異なるブローカー上に存在し、リーダーレプリカに障害が発生すると、フォロワーレプリカから新しいリーダーレプリカが選出され、サービスを提供します。 「Kafka は、マルチレプリカ メカニズムを通じて自動フェイルオーバーを実現し、Kafka クラスター内のブローカーに障害が発生した場合でもサービスの可用性を保証します。」 Kafka についてさらに詳しく調べる前に、いくつかの重要な用語を理解する必要があります。
上記の関係から、次の式を導き出すことができます: AR = ISR + OSR
そろそろ皆さんも焦り始めているかもしれませんね。Kafkaって難しすぎる!もっと学習に集中できないの? 焦らず、焦らずに。まずは理論的な知識をしっかり学ぶ必要があります。これはあなたを落胆させる始まりではなく、成長の始まりです!以下では、できるだけ分かりやすい言葉で、あなたを最も深い穴へと導いていきます! カフカの制作チーム ご存知の通り、Kafka は高度な意味では分散メッセージキューですが、簡単に言えば単なるメッセージキューです。そして、メッセージキューとは、簡単に言えば、データのプッシュと取得を行うものです。実際、高度な知識を得るには、シンプルな理解で十分です。 では、データはどこから来るのでしょうか?データはプロダクションチームから来ます!プログラミングの観点から見ると、プロダクションチームにはプロデューサーのグループが存在します(あるいはプロデューサーは1人だけの場合もあります)。プロデューサーとは、Kafkaへのメッセージの送信を担当するアプリケーションです。 クライアント側開発 製造プロセスには通常、次の手順が必要です。
「4つの主要なステップと1つのシャトルで生産上の問題を解決します」 上記のコードは、プロパティ ファイルに 4 つのパラメータを設定することを示しています。
ProducerRecord は次のように定義されます。
上記の操作には、プロデューサーインスタンスの作成とメッセージの構築が含まれます。メッセージの送信には主に3つのモードがあります。
上記で使用した送信方法は「送信して忘れる」アプローチです。つまり、メッセージが正しく到着したかどうかを気にせず、Kafka に送信するだけです。ほとんどの場合、この方法は問題なく動作しますが、場合によっては(再試行されない例外などにより)、メッセージが失われることがあります。「この方法は最高のパフォーマンスを提供しますが、信頼性は最も低くなります。」
send メソッドは Future オブジェクトを返します。
これは、`send()` メソッドが本質的に非同期であることを示しています。`send()` によって返される `Future` オブジェクトにより、呼び出し元は送信結果を後で取得できます。同期処理を実現したい場合は、`Future` の `get()` メソッドを直接呼び出すことができます。
`get()` メソッドを使用して、メッセージが正常に送信されるか例外が発生するまで、Kafka からの応答をブロックして待機します。 生産を非同期にすることはできますか? Kafka では、 send() メソッドに別のオーバーロードがあります。
コールバックの使い方は非常にシンプルで分かりやすいです。Kafkaは、メッセージが正常に送信された場合、または例外がスローされた場合に、レスポンスを受け取るとコールバックします。 onCompletion() メソッドでは、2つのパラメータは相互に排他的です。送信が成功した場合、RecordMetadata は空ではなく、Exception は空になります。送信が失敗した場合は、その逆のことが起こります。 製造面でも難しさはあるのでしょうか? KafkaProducer では、通常、次の 2 種類の例外が発生します。
ネットワーク例外、リーダー利用不可例外、不明トピックまたはパーティション例外、 NotEnoughReplicasException、NotCoordinatorException
RecordTooLargeException など 再試行可能な例外については、「retries」パラメータを設定できます。指定された再試行回数内で例外が自動的に回復した場合、例外はスローされません。「retries」パラメータのデフォルト値は0です。設定方法は以下の通りです。
上記の例は、再試行回数が 10 回であることを意味します。10 回を超えて再試行しても問題が解決しない場合は、例外がスローされます。 RecordTooLargeException などの再試行されない例外は、送信されるメッセージが大きすぎる場合、再試行されずに直接スローされることを意味します。 シリアル化で プロデューサーは、オブジェクトをネットワーク経由で Kafka に送信する前にシリアライザーを使用してオブジェクトをバイト配列に変換する必要があり、コンシューマーはデシリアライザーを使用して、Kafka から受信したバイト配列を対応するオブジェクトに変換する必要があります。 上記のコードで使用されている StringSerializer は、Serializer インターフェイスを実装します。 configure() メソッドは現在のクラスを構成するために使用され、serialize() メソッドはシリアル化操作を実行するために使用されます。 「プロデューサーが使用するシリアライザーとコンシューマーが使用するデシリアライザーは 1 対 1 で対応している必要があります。」 もちろん、Kafka が提供するシリアライザーを使用するだけでなく、独自のシリアライザーを定義することもできます。 「学生.クラス」:
「MySerializer」: "使用":
必要なのは、独自のシリアライザーをプロパティに配置することだけです。驚くほど簡単です。 パーティショナーとは何ですか? send() メソッドを介してブローカーにメッセージを送信するプロセス中に、メッセージは「インターセプター」、「シリアライザー」、および「パーティショナー」を通過する場合があります。 「インターセプター」は必須ではありませんが、「シリアライザー」は必須です。シリアライザーを通過した後、送信先のパーティションを決定する必要があります。ProducerRecordメッセージでpartitionフィールドが指定されている場合、「パーティショナー」の役割は必要ありません。partitionはメッセージの送信先のパーティション番号を表すためです。
上記はKafkaのPartitionerインターフェースです。パーティション番号を計算し、整数値を返すメソッド「partition()」があることがわかります。6つのパラメータは以下のとおりです。
partition() メソッドは、パーティション割り当てのメインロジックを定義します。キーが空でない場合、デフォルトのパーティショナーはキーに対してハッシュ演算(MurmurHash2 アルゴリズムを使用)を実行し、最終的にハッシュ値に基づいてパーティション番号を計算します。同じキーを持つメッセージは同じパーティションに書き込まれます。キーが空の場合、メッセージはトピック内の利用可能な各パーティションにラウンドロビン方式で送信されます。 キーがnullでない場合、計算されるパーティション番号はすべてのパーティションのいずれかになります。キーが空の場合、計算されるパーティション番号は利用可能なパーティションのいずれかになります。 もちろん、パーティショナーは次のようにカスタマイズすることもできます。 「MyPartitioner.クラス」: "使用": properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG、MyPartitioner.class.getName()); カスタム パーティショナーも使いやすく、Partitioner インターフェイスを実装するだけで済みます。 迎撃機はここにいますか? Web 開発者はインターセプターをよく知っているでしょう。Kafka にもインターセプター機能があり、これはさらに「プロデューサー インターセプター」と「コンシューマー インターセプター」に分かれています。 プロデューサー インターセプターは、特定のルールに従って要件を満たさないメッセージをフィルタリングしたり、メッセージの内容を変更したりするなど、メッセージ送信前の準備作業を実行できます。また、コールバック ロジックを送信する前に、カスタマイズされた要件を実行するためにも使用できます。 したがって、必要に応じてカスタマイズが行われます。カスタムインターセプターを作成する場合は、ProducerInterceptorインターフェースを実装するだけで済みます。
onSend() メソッドを使用すると、メッセージに対するカスタマイズされた操作が可能になります。一方、onAcknowledgement() メソッドは、メッセージの送信が失敗する前、またはメッセージが確認される前に呼び出され、ユーザー定義のコールバックよりも優先されます。 カスタム インターセプターは次のとおりです: MyProducerInterceptor.class: `onSend()` メソッドでは、送信するメッセージを変更しました。`onAcknowledgement()` メソッドでは、送信成功回数と送信失敗回数をカウントしました。最後に、`close()` メソッドでは、送信成功回数と送信失敗回数を出力しています。 同じ使用方法:
インターセプターは自然にインターセプターチェーンを形成します。複数のカスタムインターセプターを定義し、それらをプロパティファイルで宣言することができます。
「このように、次の迎撃機は前の迎撃機の出力に依存することになります。」 重要なパラメータ すでに述べたパラメータに加えて、他にも重要なパラメータがいくつかあります。 1. ack このパラメータは、プロデューサーがこのメッセージを有効と判断する前に、パーティション内のレプリカがいくつこのメッセージを受信する必要があるかを指定します。
メッセージは正常に書き込まれました。ack値には3種類の文字列値が含まれています。
設定:
2. 最大リクエストサイズ プロデューサークライアントが送信できるメッセージの最大数を制限するために使用されます。デフォルト値は1048576バイト(1MB)です。 3. 再試行 プロデューサーが再試行する回数を設定するために使用されます。デフォルト値は0で、例外が発生しても再試行は行われません。 4. retry.backoff.ms 不必要な頻繁な再試行を回避するために、2 回の再試行間の時間間隔を設定するために使用されます。デフォルト値は 100 です。 5. 接続最大アイドル時間(ms) このパラメータは、制限された接続が閉じられるまでの時間を指定します。デフォルト値は540000(ミリ秒)、つまり9分です。 6.バッファメモリ キャッシュされたメッセージのバッファサイズを設定するために使用されます 7. バッチサイズ 再利用可能なメモリ領域のサイズを設定するために使用されます Kafkaのコンシューマーグループ 生産があれば消費もある、そうでしょう?プロデューサーに対応するのはコンシューマーです。アプリケーションはKafkaConsumerを使ってトピックをサブスクライブし、それらのトピックからメッセージをプルできます。 個人とグループ? 各コンシューマーには対応するコンシューマーグループがあります。コンシューマーは、Kafka 内のトピックをサブスクライブし、それらのトピックからメッセージをプルする役割を担います。メッセージがトピックにパブリッシュされると、そのメッセージは、サブスクライブしている各コンシューマーグループ内の 1 つのコンシューマーにのみ配信されます。 コンシューマー グループにコンシューマーが 1 つだけの場合、状況は次のようになります。 コンシューマー グループに 2 人のコンシューマーがいる場合、状況は次のようになります。 上記の割り当てからわかるように、消費者数の増加に伴い、全体的な購買力は水平方向に拡張可能です。消費者数を増減することで、全体的な購買力を向上させる(または低下させる)ことができます。しかし、ゾーン数が固定されている場合、消費者数を盲目的に増やしても、購買力を継続的に向上させることはできません。消費者が多すぎると、消費者数がゾーン数を超え、どのゾーンにも割り当てられない消費者が生まれます。 上記の割り当てロジックは、デフォルトのパーティション割り当て戦略に基づいています。コンシューマーとサブスクライブされたトピック間のパーティション割り当て戦略は、コンシューマークライアントのpartition.assignment.strategyを設定することで設定できます。 配送方法 Kafka には 2 つのメッセージ配信モードがあります。 ポイントツーポイントモード キューベースのシステムでは、メッセージプロデューサーがキューにメッセージを送信し、メッセージコンシューマーがキューからメッセージを受信します。 パブリッシュ/サブスクライブモデル(Pub/Sub) トピックベースのモデルでは、トピックはメッセージ配信の媒体と考えることができます。メッセージパブリッシャーはトピックにメッセージをパブリッシュし、メッセージサブスクライバーはそのトピックからのメッセージをサブスクライブします。トピックにより、サブスクライバーとパブリッシャーは互いに独立して動作し、直接的な接触なしにメッセージを配信できます。パブリッシュ/サブスクライブパターンは、1対多のメッセージブロードキャストで使用されます。 クライアント側開発 消費プロセスには通常、次の手順が含まれます。
ご覧のとおり、コンシューマー パラメータを構成するときに、いくつかのよく知られたパラメータが表示されます。
`client.id`: 誤入力を防ぐため、`ConsumerConfig.CLIENT_ID_CONFIG` で表すことができます。これは、`KafkaConsumer` に対応するクライアントIDを設定するために使用されます。デフォルト値は " " です。 テーマサブスクリプション コンシューマーがメッセージを消費するには、対応するトピックをサブスクライブすることが重要です。上記の例では、`consumer.subscribe(Arrays.asList(topic));` を使用してトピックをサブスクライブしており、コンシューマーが1つ以上のトピックをサブスクライブできることを示しています。`subscribe()` メソッドのオーバーロードを見てみましょう。
トピックをサブスクライブするプロセス中に次の状況が発生した場合:
最終的には、topic1 ではなく topic2 のみがサブスクライブされ、topic1 と topic2 の組み合わせはサブスクライブされなくなります。 オーバーロードされた subscribe() メソッドは正規表現もサポートします。
この構成では、誰かが新しいトピックを作成し、そのトピック名が正規表現と一致する場合、コンシューマーは新しく追加されたトピックからのメッセージを消費できます。 subscribe() メソッドは、トピックと正規表現をパラメータとして渡すだけでなく、対応する再バランス リスナーを設定するために使用される ConsumerRebalanceListener パラメータを他の 2 つのメソッドに渡すこともサポートしています。 subscribe() メソッドを使用してトピックをサブスクライブするだけでなく、コンシューマーは assign() メソッドを使用して特定のトピックの特定のセクションを直接サブスクライブすることもできます。
TopicPartition オブジェクトは次のように定義されます。 コンストラクターには、入力として「サブスクライブされたトピック」と「パーティション番号」が必要です。次のように使用されます。
このようにして、kafka-demo のパーティション 0 をサブスクライブできます。 トピックにいくつのパーティションがあるか事前にわからない場合はどうすればよいでしょうか?KafkaConsumer の `partitionsFor()` メソッドを使用すると、指定したトピックのメタデータ情報を照会できます。`partitionsFor()` メソッドは次のように定義されています。
PartitionInfo オブジェクトは次のように定義されます。
サブスクリプションは悪意を持ってバンドルされているわけではありません。サブスクライブとアンサブスクライブは自由に行えます。トピックのサブスクライブを解除するには、`KafkaConsumer` の `unsubscribe()` メソッドを使用します。このメソッドは、`subscribe(Collection)`、`subscribe(Pattern)`、`assign(Collection)` メソッドを使用して実装されたサブスクリプションのサブスクライブを解除できます。
`subscribe(Collection)` または `assign(Collection)` の collection パラメータに空のコレクションを設定すると、`unsubscribe()` メソッドと同じ効果が得られます。以下の例の3行のコードは、いずれも同じ結果になります。
消費パターン 一般的に、メッセージの消費パターンには「プッシュ」と「プル」の2種類があります。Kafka の消費は「プル」パターンに基づいています。 プッシュ モード: サーバーはメッセージをコンシューマーにプロアクティブにプッシュします。 プル モード: コンシューマーがサーバーへのプル リクエストを積極的に開始します。 Kafka のメッセージ消費は継続的なポーリングプロセスです。コンシューマー側で行う必要があるのは、poll() メソッドを繰り返し呼び出すことだけです。一部のパーティションに消費可能なメッセージがない場合、そのパーティションのメッセージ取得結果は空になります。また、サブスクライブされているすべてのパーティションに消費可能なメッセージがない場合、poll() メソッドは空のメッセージセットを返します。
poll() メソッドにはタイムアウトパラメータを渡すことで、poll() メソッドのブロック時間を制御できます。コンシューマーのバッファに利用可能なデータがない場合、poll() メソッドはブロックされます。 poll() メソッドを使用して取得されるメッセージは、次のように定義される ConsumerRecord オブジェクトです。 メッセージを消費するときに、ConsumerRecord 内の関心のあるフィールドに対して特定のビジネス ロジック処理を直接実行できます。 消費者インターセプター プロデューサーインターセプターの使用については既に説明しましたが、当然のことながら、コンシューマーにも独自のインターセプターがあります。コンシューマーインターセプターは主に、メッセージをコンシュームする際、またはコンシュームオフセットをコミットする際に、カスタマイズされた操作を実行します。 プロデューサーは `ProducerInterceptor` インターフェースを実装することでインターセプターを定義し、コンシューマーは `ConsumerInterceptor` インターフェースを実装することでインターセプターを定義します。`ConsumerInterceptor` インターフェースは以下のように定義されます。
カスタム インターセプターを作成した後も、同じ方法を使用しました。
重要なパラメータ すでに述べたパラメータに加えて、他にも重要なパラメータがいくつかあります。 1. フェッチする最小バイト数 このパラメータは、コンシューマーが単一のプルリクエスト(poll() メソッドの呼び出し)で Kafka からプルできるデータの最小量を設定します。デフォルト値は 1B です。返されるデータサイズがこのパラメータの値よりも小さい場合、コンシューマーはデータサイズが設定されたサイズに達するまで待機する必要があります。 2. フェッチ最大バイト数 このパラメータは、コンシューマーが単一のプル リクエストで Kafka からプルできるデータの最大量を設定します。デフォルト値は 52,428,800 バイト (50 MB) です。 3. フェッチ最大待機時間(ms) このパラメータは Kafka の待機時間を指定します。デフォルト値は 500 ミリ秒です。 4. 最大パーティションフェッチバイト このパラメータは、各パーティションからコンシューマーに返されるデータの最大量を設定します。デフォルト値は 1,048,576 バイト (1 MB) です。 5. 最大ポーリングレコード数 このパラメータは、コンシューマーが 1 回のリクエストで取得できるメッセージの最大数を設定します。デフォルト値は 500 です。 6. リクエストタイムアウト(ミリ秒) このパラメータは、コンシューマーがリクエスト応答を待機する最大時間を構成します。デフォルト値は 30000 ミリ秒です。 Kafkaトピック管理 プロデューサー側とコンシューマー側の前のセクションで、「トピック」の概念について既に説明しました。「トピック」はKafkaの中核を成すものです。 メッセージの分類であるトピックは、さらに1つ以上のパーティションに細分化できます。パーティションは、メッセージの二次的な分類とも言えます。パーティション化は、Kafka にスケーラビリティと水平スケーリング機能を提供するだけでなく、マルチレプリカメカニズムによるデータ冗長性を提供することでデータの信頼性を向上させます。 1.テーマを作成する ブローカー側には、auto.create.topics.enable(デフォルト値はtrue)という設定パラメータがあります。このパラメータが「true」の場合、プロデューサーがまだ作成されていないトピックにメッセージを送信しようとすると、パーティション数num.partitions(デフォルト値は1)とレプリケーション係数default.replication.factor(デフォルト値は1)で自動的にトピックが作成されます。 「スクリプトを使用して作成」:
「TopicCommand を使用してトピックを作成する」: Maven 依存関係をエクスポートします。
上述示例中,创建了一个分区数为4,副本因子为2 的主题 2. 查看主题
通过list指令可以查看当前所有可用的主题:
通过describe指令可以查看单个主题信息,如果不适用--topic 指定主题,则会展示出所有主题的详细信息。--topic还支持指定多个主题:
3.修改主题 当一个主题被创建之后,我们可以对其做一定的修改,比如修改分区个数、修改配置等,借助于alter指令来实现:
修改分区的时候我们需要注意的是: 当主题kafka-demo 的分区数为1 时,不管消息的key 为何值,消息都会发往这一个分区,当分区数增加到3 时,就会根据消息的key 来计算分区号,原本发往分区0 的消息现在就有可能发往分区1 或分区2。因此建议一开始就要设置好分区数量。 目前Kafka 只支持增加分区数而不支持减少分区数,当我们要把主题kafka-demo 的分区数修改为1 时,就会报出InvalidPartitionException 异常。 4. 删除主题 如果确定不再使用一个主题,那么最好的方式就是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。这个时候我们就可以借助delete 指令来删除主题:
需要注意的是我们必须将broker中的delete.topic.enable参数配置为true 才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。 如果要删除的主题是Kafka 的内部主题,那么删除时就会报错。例如:__consumer_offsets和__transaction_state 常见参数
上面大致就是Kafka 的入门内容啦,今天的知识就介绍到这里啦,内容虽然不是很深入,但是字数也不少,能完整看完的小伙伴,小菜给你点个赞哦! |