DUICUO

長い間分散システムに取り組んできたのに、Kafka が何なのかすら知らないのですか?

Kafkaは、エンタープライズおよびインターネットプロジェクトでますます利用されています。この記事では、Kafkaの基礎から始め、その壮大なビジョンを探求していきます。

[[280600]]

Pexelsからの画像

Kafka とは何ですか?

Kafka は、次の 3 つの主要機能を備えた分散ストリーミング プラットフォームです。

  • メッセージ キューやエンタープライズ メッセージング システムに似たレコード ストリームを公開するにはサブスクライブします。
  • レコード ストリームをフォールト トレラントな方法で保存します。
  • リアルタイム録画ストリーム。

Kafka のアプリケーション:

  • メッセージング システムとして。
  • ストレージシステムとして。
  • ストリームプロセッサとして。

Kafka はストリーミングデータパイプラインを構築し、システム間またはアプリケーション間でデータを確実に取得できます。これにより、ストリーミングアプリケーションはデータの送信と応答が可能になります。

メッセージングシステムとしてのKafka

メッセージング システムとしての Kafka には、次の 3 つの基本コンポーネントがあります。

  • プロデューサー: メッセージを公開するクライアント。
  • ブローカー: プロデューサーからのメッセージを受信して​​保存するクライアント。
  • コンシューマー: コンシューマーはブローカーからのメッセージを読み取ります。

大規模システムでは、多くのサブシステムとやり取りし、メッセージを渡す必要があります。このようなシステムには、ソースシステム(メッセージの送信者)と宛先システム(メッセージの受信者)が存在します。

このようなメッセージング システムでデータを送信するには、適切なデータ パイプラインが必要です。

このようなデータのやり取りは非常に複雑に見えます。メッセージングシステムを使用すれば、システムははるかにシンプルでクリーンになります。

Kafka は、1 つ以上のデータ センター内のサーバー上でクラスターとして実行されます。

  • Kafka クラスター内のメッセージ レコードを保存するディレクトリは Topics と呼ばれます。
  • 各メッセージ レコードには、キー、値、タイムスタンプの 3 つの要素が含まれます。

コアAPI

Kafka には次の 4 つのコア API があります。

  • Producer API を使用すると、アプリケーションは 1 つ以上のトピックにメッセージ レコードを送信できます。
  • Consumer API を使用すると、アプリケーションは 1 つ以上のトピックをサブスクライブし、それらに対して生成されたレコードのストリームを処理できます。
  • Streams API を使用すると、アプリケーションはストリーム プロセッサとして機能し、1 つ以上のトピックからの入力ストリームを消費してそれらの出力ストリームを生成し、入力ストリームを効果的に出力ストリームに変換できます。
  • コネクタAPIを使用すると、Kafkaトピックを既存のアプリケーションやデータシステムに接続するプロデューサーとコンシューマーを構築・実行できます。例えば、リレーショナルデータベース用のコネクタは、テーブルへのすべての変更をキャプチャできます。

Kafka の基本概念

非常にスケーラブルでフォールトトレラントなメッセージングシステムであるKafkaには、多くの基本的な概念があります。Kafka特有の概念について学びましょう。

トピック

トピックはトピックと呼ばれます。Kafkaでは、カテゴリ属性はメッセージを異なるクラスに分類するために使用され、このクラスはトピックと呼ばれます。

トピックはメッセージの割り当てラベルのような論理的な概念です。トピックは、データベース内のテーブルやファイルシステム内のフォルダに似ています。

パーティション

パーティション(partition)とは、トピック内のメッセージを1つ以上のパーティションに分割することを指します。これは物理的な概念であり、システム内の1つ以上のディレクトリに対応します。各パーティションはコミットログです。メッセージは追加専用形式でパーティションに書き込まれ、順次読み込まれます。

注: トピックには多数のパーティションが含まれているため、トピック全体で順序は保証されませんが、単一のパーティション内では順序が保証されます。メッセージは各パーティションの末尾に強制的に追加されます。Kafka はパーティション分割によってデータの冗長性とスケーラビリティを実現します。

パーティションは異なるサーバーに分散できるため、トピックを複数のサーバーにまたがって配置し、単一のサーバーよりも強力なパフォーマンスを実現できます。

セグメント

セグメントはセグメントとして翻訳され、パーティションをさらに複数のセグメントに分割し、各セグメント ファイルは同じサイズになります。

ブローカ

Kafka クラスターは 1 つ以上のサーバーで構成され、各サーバーはブローカーと呼ばれます。ブローカーはプロデューサーからメッセージを受信し、メッセージのオフセットを設定し、メッセージをディスクにコミットします。

ブローカーは、パーティションの読み取り要求に応答し、パーティションがディスクにコミットされたことを示すメッセージを返すことで、コンシューマーにサービスを提供します。

ブローカーはクラスタのコンポーネントです。各クラスタには、クラスタコントローラ(リーダー)としても機能するブローカーが1つ存在し、クラスタのアクティブなメンバーによって選出されます。

クラスター内の各メンバーは、潜在的にリーダーとして機能することができます。リーダーは、ブローカーへのパーティションの割り当てやブローカーの監視などの管理タスクを担当します。

クラスターでは、パーティションはリーダーに属しますが、パーティションは複数のブローカー (リーダー以外) に割り当てることができ、その場合、パーティションのレプリケーションが発生します。

このレプリケーション メカニズムは、パーティションにメッセージの冗長性を提供します。ブローカーに障害が発生した場合、他のアクティブなユーザーが新しいリーダーを選択して引き継ぎます。

プロデューサー

プロデューサー、つまりメッセージ パブリッシャーは、特定のトピックから対応するパーティションにメッセージを公開します。

デフォルトでは、プロデューサーは特定のメッセージがどのパーティションに書き込まれるかに関係なく、トピック内のすべてのパーティションにメッセージを均等に分散します。ただし、場合によっては、プロデューサーが指定されたパーティションに直接メッセージを書き込むこともあります。

消費者

コンシューマーはメッセージのユーザーです。コンシューマーは複数のトピックからメッセージを消費できますが、特定のトピックについては、同じパーティションからのメッセージのみを消費します。

Kafka の基本的な概念を理解した後、Kafka クラスターを構築して Kafka をさらに詳しく調べます。

設置環境の確保

Java環境をインストールする

Kafka をインストールする前に、Linux システムに Java 環境がインストールされていることを確認してください。Java のバージョンを確認するには、`java -version` コマンドを使用してください。JDK 1.8 を推奨します。

Java 環境がインストールされていない場合は、この記事に従ってインストールしてください。

  1. https://www.cnblogs.com/zs-notes/p/8535275.html

Zookeeper環境をインストールする

Kafka はメタデータを保存し、一貫性を確保するために Zookeeper を使用するため、Kafka をインストールする前に Zookeeper をインストールする必要があります。Kafka ディストリビューションには Zookeeper が付属しており、スクリプトを使用して直接起動できますが、Zookeeper のインストールは難しくありません。

Zookeeper スタンドアロンセットアップ

単一の Zookeeper インスタンスの設定は比較的簡単です。公式 Web サイトから Zookeeper の安定バージョンをダウンロードするだけです。

  1. https://www.apache.org/dyn/closer.cgi/zookeeper/

ここではバージョン3.4.10を使用しています。ダウンロード後、Linuxシステムの/usr/localディレクトリにZookeeperフォルダを作成してください。

次に、xftp ツール (公式 Web サイト https://www.netsarang.com/zh/xshell/ で xftp ツールと xshell ツールの両方の無料のホーム バージョンを申請できます) を使用して、ダウンロードした Zookeeper の圧縮パッケージを /usr/local/zookeeper ディレクトリに配置します。

tar.gz パッケージをダウンロードした場合は、単に `tar -zxvf zookeeper-3.4.10.tar.gz` を使用して抽出できます。

zipパッケージをダウンロードした場合は、Linuxに解凍ツールがインストールされているかどうかも確認する必要があります。インストールされていない場合は、「yum install unzip」コマンドでzip解凍ツールをインストールし、「unzip zookeeper-3.4.10.zip」コマンドで解凍してください。

解凍後、/usr/local/zookeeper/zookeeper-3.4.10 に cd してデータ フォルダーを作成し、conf フォルダーに入って、mv zoo_sample.cfg zoo.cfg を使用して名前を変更します。

次に、vi で zoo.cfg を開き、dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data に変更して保存します。

binディレクトリに移動し、コマンド「./zkServer.sh start」を入力してサービスを開始します。以下の出力が表示されれば、セットアップは成功です。

サービスを停止するには、次のコマンドを入力します: `./zkServer.sh stop`

ステータス情報を表示するには、`./zkServer.sh status` を使用できます。

Zookeeper クラスターのセットアップ

①準備条件

3台のサーバーが必要です。ここではCentOS7を使用し、3台の仮想マシンをインストールしました。各仮想マシンに1GBのメモリを割り当て、各マシンの/usr/local/以下にZookeeperフォルダを作成しました。

Zookeeperの圧縮ファイルを移動して解凍すると、zookeeper-3.4.10フォルダが作成されます。このフォルダに入り、dataとlogという2つの新しいフォルダを作成します。

注: 前のセクションで説明した単一マシンのセットアップでデータフォルダが既に作成されているため、再度作成する必要はありません。ログフォルダのみ作成してください。これらの2つのフォルダは、新たに追加する他の2つのサービス用に作成する必要があります。

② クラスターを構成する

新しいファイルを作成したら、conf/zoo.cfgファイルを編集する必要があります。3つのファイルの内容は次のとおりです。

  1. ティックタイム=2000
  2. 初期制限=10
  3. 同期制限=5
  4. データディレクトリ=/usr/ローカル/zookeeper/zookeeper-3.4.10/data
  5. dataLogDir=/usr/ローカル/zookeeper/zookeeper-3.4.10/log
  6. クライアントポート=12181
  7. サーバー1=192.168.1.7:12888:13888
  8. サーバー2=192.168.1.8:12888:13888
  9. サーバー3=192.168.1.9:12888:13888

server.1 の 1 はサーバー識別子を表します。サーバー番号を示す任意の数字を指定することもできます。この識別子は、以下で設定する myid の識別子と一致する必要があります。

192.168.1.7:12888:13888 はクラスタ内の IP アドレスです。最初のポートはマスターとスレーブ間の通信インターフェースを表し、デフォルトでは 2888 です。

2番目のポートはリーダー選出用のポートです。これは、クラスターがリーダー選出のために起動するとき、または前のリーダーが失敗した後に新しいリーダーが選出されるときに使用されます。デフォルトのポートは3888です。

それでは、上記の設定ファイルについて説明しましょう。

  • tickTime: この時間は、Zookeeperサーバー間、またはクライアントとサーバー間のハートビートを維持する間隔です。つまり、tickTimeごとにハートビートが送信されます。
  • initLimit: この設定オプションは、クライアント (ここで言うクライアントとは、Zookeeper サーバーに接続するユーザーのクライアントではなく、リーダーに接続する Zookeeper サーバー クラスター内のフォロワー サーバー) を受け入れて接続を初期化するときに、Zookeeper が許容できるハートビート間隔の最大数を設定するために使用されます。

Zookeeperサーバーが5ハートビート(tickTime)以上経過してもクライアントからの応答を受信しない場合、クライアント接続が失敗したことを示します。合計時間は5 * 2000 = 10秒です。

  • syncLimit: この設定項目は、リーダーとフォロワー間のメッセージ送信、リクエスト、およびレスポンスの持続時間を指定します。使用可能な tickTimes の最大数を指定します。合計持続時間は 5 * 2000 = 10 秒です。
  • dataDir: スナップショット ログのストレージ パス。
  • dataLogDir: トランザクションログの保存パス。このパスが設定されていない場合、トランザクションログはデフォルトでdataDirで指定されたディレクトリに保存されます。これはZooKeeperのパフォーマンスに重大な影響を与えます。ZooKeeperのスループットが高い場合、トランザクションログとスナップショットログが過剰に生成される可能性があります。
  • clientPort: これはクライアントがZookeeperサーバーに接続する際に使用するポートです。Zookeeperはこのポートをリッスンし、クライアントからのアクセス要求を受け入れます。

③ myidファイルを作成する

設定ファイルの内容を理解した上で、各クラスタノードに `myid` を作成しましょう。前述の通り、この `myid` は `server.1` の `1` です。同様に、クラスタ内の各サービスにも識別子を指定する必要があります。これは `echo` コマンドを使用して作成します。

  1. # サーバー.1
  2. echo "1" > /usr/ローカル/zookeeper/zookeeper-3.4.10/data/myid
  3. # サーバー.2
  4. echo "2" > /usr/ローカル/zookeeper/zookeeper-3.4.10/data/myid
  5. # サーバー.3
  6. echo "3" > /usr/ローカル/zookeeper/zookeeper-3.4.10/data/myid

④ サービスを起動してテストします。

設定が完了しました。各ZKサービスを起動してテストしてください。私のWindowsコンピューターでのテスト結果は次のとおりです。

サービスを開始します (各マシンで実行する必要があります)。

  1. cd /usr/ローカル/zookeeper/zookeeper-3.4.10/bin
  2. ./zkServer.sh を起動します

サービスのステータスを確認するには、コマンド `./zkServer.sh status` を使用します。

192.168.1.7 --- フォロワー:

192.168.1.8 --- リーダー:

192.168.1.9 --- フォロワー:

Zookeeperクラスタは通常、リーダー1台とフォロワー複数台で構成されます。リーダーは通常、クライアントからの読み取りおよび書き込み要求に応答し、フォロワーはリーダーとデータを同期します。リーダーに障害が発生した場合、フォロワーの中から新しいリーダーが選出されます。

Kafka クラスターのセットアップ

準備条件

準備条件は次のとおりです。

  • セットアップされたZookeeperクラスター
  • Kafka 圧縮パッケージ
  1. https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz

/usr/local の下に Kafka という名前の新しいフォルダーを作成し、ダウンロードした tar.gz パッケージを /usr/local/kafka ディレクトリに移動し、tar -zxvf を使用して解凍します。

解凍後、kafka_2.12-2.3.0 ディレクトリに移動し、log という名前の新しいフォルダーを作成して、config ディレクトリに移動します。

多くのプロパティ構成ファイルがあることがわかりますが、ここでは server.properties ファイルのみに注目する必要があります。

Kafka を起動するには 2 つの方法があります。

  • 1 つの方法は、Kafka に付属する Zookeeper 構成ファイルを使用して Kafka を起動することです (公式 Web サイトの指示に従って起動し、複数のノードを持つ単一のサービスを使用してクラスターをシミュレートできます http://kafka.apache.org/quickstart#quickstart_multibroker)。
  • 1 つの方法は、別の ZK クラスターを使用して開始することですが、ZK クラスターから開始する 2 番目の方法をお勧めします。

② 設定項目を変更する

各サービスの設定項目、特にserver.propertiesを変更する必要があります。以下の内容を更新または追加する必要があります。

  1. broker.id=0 //初期値は0です。各サーバーのbroker.idは、myidと同様に異なる値に設定する必要があります。私の3つのサービスはそれぞれ1、2、3に設定されています。
  2. log.dirs=/usr/ローカル/kafka/kafka_2.12-2.3.0/log
  3.  
  4. # log.retention.hours=168 の下に次の3つの項目を追加します
  5. メッセージ最大バイト =5242880
  6. デフォルト.replication.factor=2
  7. レプリカ.フェッチ.最大.バイト=5242880
  8.  
  9. # ZooKeeperの接続ポートを設定する
  10. 動物園管理人.connect =192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

設定項目の意味:

  1. broker.id=0 # クラスター内の現在のマシンの一意の識別子。ZooKeeper の myid に似ています。
  2. port=9092 # Kafka が外部サービスを提供する際のデフォルト ポートは 9092 です。
  3. host.name = 192.168.1.7 # このパラメータはデフォルトでオフになっています。バージョン0.8.1には、DNS解決に問題が生じ、失敗率が上昇するバグがあります。
  4. num.network.threads=3 # これはブローカーがネットワーク処理に使用するスレッドの数です。
  5. num.io.threads=8 # これはブローカーが I/O 処理に使用するスレッドの数です。
  6. `log.dirs=/usr/ local /kafka/kafka_2.12-2.3.0/log` # メッセージを保存するディレクトリ。このディレクトリはカンマ区切りの式で設定できます。上記の`num.io.threads`は、指定したディレクトリの数よりも大きくする必要があります。複数のディレクトリが設定されている場合、新しく作成されたトピックは、カンマ区切りのリストで指定されたパーティション数が最も少ないディレクトリにメッセージを保存します。
  7. socket.send.buffer.bytes=102400 # 送信バッファのサイズ。データは一度に送信されるのではなく、まずバッファに保存され、一定サイズに達した後に送信されるため、パフォーマンスが向上します。
  8. socket.receive.buffer.bytes=102400 # Kafka 受信バッファのサイズ。データは一定のサイズに達した後にのみディスクにシリアル化されます。
  9. ` socket.request.max.bytes =104857600` # このパラメータは、Kafka にリクエストまたは送信できる最大バイト数です。この値は Java スタックサイズを超えることはできません。
  10. num.partitions=1 # デフォルトのパーティション数。デフォルトでは 1 つのトピックに 1 つのパーティションがあります。
  11. log.retention.hours=168 # デフォルトの最大メッセージ保存時間、168時間、7日間
  12. message.max.byte =5242880 # 最大メッセージサイズ(5MB)
  13. default .replication.factor=2 # Kafka がメッセージを保存するレプリカの数。1 つのレプリカに障害が発生しても、別のレプリカが引き続きサービスを提供できます。
  14. replica.fetch.max.bytes = 5242880 #取得する最大バイト数
  15. log.segment.bytes=1073741824 # このパラメータは、Kafka メッセージがファイルに追加されるためです。この値を超えると、Kafka は新しいファイルを作成します。
  16. ` log.retention.check.interval.ms =300000` # 300000 ミリ秒ごとに、上記で設定したログの有効期限 (log.retention.hours=168) を確認し、期限切れのメッセージがないかディレクトリをチェックして、見つかった場合は削除します。
  17. log.cleaner.enable= false # ログ圧縮を有効にするかどうか。通常は有効にする必要はありませんが、有効にするとパフォーマンスが向上する場合があります。
  18. zookeeper.connect =192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 # ZooKeeperの接続ポートを設定します

③ Kafka クラスターを起動してテストします。

サービスを開始するには、/usr/local/kafka/kafka_2.12-2.3.0/bin ディレクトリに移動します。

  1. # バックグラウンドプロセスを開始する
  2. ./kafka-server-start.sh -daemon ../config/server.properties

サービスが実行されているかどうかを確認します。

  1. # コマンド jps を実行する
  2. 6201 クォーラムピアメイン
  3. 7035 JP
  4. 6972 カフカ

Kafkaはすでに実行されています。トピックを作成して、正常に作成されたことを確認してください。

  1. # cd .. 1つ前のレベルに戻って、 /usr/local/kafka /kafka_2.12-2.3.0ディレクトリに戻ります。
  2. bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan  

上記の説明に関して:

  • レプリケーション係数2: コピーを2つ作成する
  • パーティション1: パーティションを1つ作成
  • トピック: トピックの作成

テーマが正常に作成されたかどうかを確認します。

  1. bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181  

サービスを開始するだけで、クラスターを起動して実行できるようになります。

単一のマシン上にパブリッシャーを作成します。

  1. # ブローカー、パブリッシャーを作成する
  2. ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic  

サーバー上にサブスクライバーを作成します。

  1. # コンシューマーを作成する
  2. bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning  

注意: ここで `--zookeeper` を使用すると、「zookeeper は認識されたオプションではありません」というエラーが発生する可能性があります。これは Kafka のバージョンが高すぎるためであり、代わりに `--bootstrap-server` ディレクティブを使用する必要があります。

テスト結果は次のとおりです。

リリース:

消費:

④ その他のコマンド

トピックを表示:

  1. bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181  
  2.  
  3. # 見せる
  4. クアントピック

トピックのステータスを確認します:

  1. bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic  
  2.  
  3. # 以下は詳細な表示内容です。
  4. トピック:cxuantopic パーティション数:1 レプリ​​ケーション係数:2 構成:
  5. トピック: cxuantopic パーティション: 0 リーダー: 1 レプリ​​カ: 1,2 Isr: 1,2
  6.  
  7. # パーティションは 1、レプリケーション係数は 2、トピック cxuantopic にはパーティションが 0 個あります。
  8. # レプリカ: 0,1 これはレプリカ 1 と 2 がコピーされることを意味します。

リーダーは、特定のパーティション上のすべての読み取りおよび書き込み操作を担当し、各ノードはランダムに選択されてリーダーになります。

レプリカは、リーダーであるか現在アクティブであるかに関係なく、このパーティションのログを複製するノードのリストです。

Isr は同期レプリカのコレクションです。これは、現在アクティブでリーダーに追従するレプリカリストのサブセットです。これで、Kafka クラスターのセットアップは完了です。

⑤ マルチノードデータ受信の検証

ここ数日間、同じIPアドレスを使用してきました。今度は、他のクラスターのノードを使用して、サービスが受信できるかどうかを確認してみましょう。

他の 2 つのノードでも使用します。

  1. bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning  

次に、ブローカーを使用してメッセージを送信し、テストの結果、3 つのノードすべてがメッセージを受信できることが示されました。

構成の詳細

Kafka のセットアップ時に、server.properties の設定の意味について簡単に説明しました。ここでは、パラメータの設定と概念について詳しく説明します。

標準構成

これらのパラメータは、Kafka の最も基本的な構成です。

broker.id: 各ブローカーには、broker.id で表される識別子が必要です。デフォルト値は 0 ですが、任意の整数に設定できます。クラスターでは、各ノードの broker.id が一意であることが不可欠です。

port: 構成サンプルを使用して Kafka を起動すると、ポート 9092 でリッスンします。ポート構成パラメータを他の使用可能なポートに変更できます。

zookeeper.connect: ブローカー メタデータを保存するために使用されるアドレスは、zookeeper.connect を通じて指定されます。

localhost:2181: これは、プログラムがローカル マシンのポート 2181 で実行されていることを示します。この構成パラメーターは、ホスト名:ポート/パス パラメーターのコンマ区切りのリストです。

各部分の意味は次のとおりです。

  • ホスト名は、Zookeeper サーバーのサービス名または IP アドレスです。
  • port は Zookeeper が接続するポートです。
  • `/path` は、Kafka クラスターの chroot 環境として使用されるオプションの Zookeeper パスです。指定されていない場合は、デフォルトでルートパスが使用されます。

log.dirs: Kafka はメッセージをディスクに保存します。これらのログフラグメントを保存するディレクトリは、log.dirs で指定します。これは、カンマで区切られたローカルファイルシステムのパスのセットです。

複数のパスを指定した場合、ブローカーは「最も使用頻度の低い」原則に従って、同じパーティションからのログフラグメントを同じパスに保存します。

ブローカーは、ディスク容量が最も少ないパスではなく、パーティション数が最も少ないパスにパーティションを追加することに注意してください。

`num.recovery.threads.per.data.dir`: Kafka は、次の 3 つの場合に、設定可能なスレッド プールを使用してログ フラグメントを処理します。

  • サーバーは正常に起動し、各パーティションのログ セグメントを開くために使用されます。
  • これはサーバーがクラッシュした後に開始され、各パーティションのログのフラグメントを検査して切り捨てるために使用されます。
  • サーバーは正常にシャットダウンし、ログ セグメントが閉じられます。

デフォルトでは、各ログディレクトリは1つのスレッドのみを使用します。これらのスレッドはサーバーの起動とシャットダウン時にのみ使用されるため、多数のスレッドを設定して並列処理を実現することも可能です。

特に、多数のパーティションを持つサーバーの場合、リカバリ中に適切に実行される操作を使用すると、クラッシュが発生した場合に数時間を節約できます。

このパラメータを設定するときは、設定された番号が log.dirs で指定された単一のログ ディレクトリに対応することに注意することが重要です。

つまり、num.recovery.threads.per.data.dir が 8 に設定され、log.dir が 3 つのパスを指定する場合、合計 24 個のスレッドが必要になります。

`auto.create.topics.enable`: デフォルトでは、Kafka は次の 3 つの状況でトピックを作成します。

  • プロデューサーがトピックにメッセージを書き込み始めると
  • コンシューマーがトピックからメッセージを読み取り始めると
  • いずれかのクライアントがトピックにメタデータリクエストを送信すると

delete.topic.enable: トピックを削除する場合は、トピック管理ツールを使用できます。

デフォルトでは、トピックの削除は許可されていません。delete.topic.enable のデフォルト値は false であるため、トピックを任意に削除することはできません。

これは本番環境では適切な保護対策ですが、開発環境やテスト環境ではテーマの削除が許可されています。そのため、テーマを削除する場合は、delete.topic.enable を true に設定する必要があります。

テーマのデフォルト設定

Kafka は、新しく作成されたトピックに対して多くのデフォルト設定パラメータを提供しています。これらのパラメータを一緒に見ていきましょう。

num.partitions: num.partitions パラメータは、新しく作成されたトピックに含めるパーティションの数を指定します。

自動テーマ作成機能が有効になっている場合(デフォルトで有効になっています)、テーマパーティションの数は、このパラメータで指定された値になります。このパラメータのデフォルト値は1です。テーマパーティションの数を増やすことはできますが、減らすことはできませんのでご注意ください。

default.replication.factor: このパラメーターは非常にシンプルで、Kafka が保存するメッセージのレプリカの数を表します。

1つのレプリカに障害が発生した場合でも、別のレプリカがサービスを継続できます。default.replication.factor のデフォルト値は1です。このパラメータは、テーマの自動作成機能を有効にした後に有効になります。

`log.retention.ms`: Kafka は通常、時間に基づいてデータの保持期間を決定します。デフォルトの時間は `log.retention.hours` パラメータで設定され、168 時間、つまり 1 週間です。

これらに加えて、log.retention.minutes と log.retention.ms という2つのパラメータがあります。これら3つのパラメータは、メッセージを削除するまでの保存期間を決定するという同じ機能を持ちます。log.retention.ms の使用をお勧めします。

log.retention.bytes: メッセージを保持するもう1つの方法は、メッセージの有効期限が切れているかどうかを判断することです。この値はパラメータlog.retention.bytesで指定され、各パーティションに適用されます。

つまり、8 つのパーティションを持つトピックがあり、log.retention.bytes が 1 GB に設定されている場合、このトピックは最大 8 GB のデータを保持できます。

したがって、トピック内のパーティションの数が増えると、トピック全体で保持できるデータの量も増加します。

log.segment.bytes: 上記のログは、個々のメッセージではなく、ログ セグメントに適用されます。

メッセージがブローカーに到着すると、パーティションの現在のログセグメントに追加されます。ログセグメントのサイズがlog.segment.bytesで指定された上限(デフォルトは1GB)に達すると、現在のログセグメントは閉じられ、新しいログセグメントが開かれます。

ログセグメントが閉じられると、有効期限の待機状態になります。このパラメータの値が小さいほど、新しいファイルが閉じられ、割り当てられる頻度が高くなり、ディスク書き込みの全体的な効率が低下します。

log.segment.ms: 前述の通り、ログセグメントは閉じられた後、有効期限が切れるまで待機する必要があります。log.segment.msパラメータは、ログセグメントが閉じられた状態を維持する時間を指定します。

log.segment.ms と log.retention.bytes の間には排他性の問題はありません。ログセグメントは、サイズ制限または時間制限のいずれかが満たされた時点で閉じられます。

message.max.bytes: ブローカーは、message.max.bytesパラメータを設定することで、単一メッセージのサイズを制限します。デフォルト値は1,000,000(1MB)です。

プロデューサーがこのサイズを超えるメッセージを送信しようとすると、メッセージは受信されないだけでなく、ブローカーによってエラー メッセージも返されます。

他のバイト関連の設定パラメータと同様に、このパラメータは圧縮されたメッセージのサイズを表します。つまり、圧縮されたメッセージがmessage.max.bytesより小さければ、実際のメッセージサイズはこの値より大きくても構いません。

この値はパフォーマンスに大きな影響を与えます。値が高いほど、ネットワーク接続とリクエストを処理するスレッドの処理時間が長くなります。また、ディスク書き込みブロックのサイズも大きくなり、I/Oスループットに影響します。

記事参照:

  • Kafka [パート1] Kafkaクラスターのセットアップ
  • https://juejin.im/post/5ba792f5e51d450e9e44184d
  • https://blog.csdn.net/k393393/article/details/93099276
  • カフカ:決定版ガイド
  • https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/

[編集者のおすすめ]

  1. 分散メッセージングシステムの主要な設計上の考慮事項
  2. Kafka ソースコード分析と原理の図解説明: ブローカー側