|
Kafkaは、エンタープライズおよびインターネットプロジェクトでますます利用されています。この記事では、Kafkaの基礎から始め、その壮大なビジョンを探求していきます。
Pexelsからの画像 Kafka とは何ですか? Kafka は、次の 3 つの主要機能を備えた分散ストリーミング プラットフォームです。
Kafka のアプリケーション:
Kafka はストリーミングデータパイプラインを構築し、システム間またはアプリケーション間でデータを確実に取得できます。これにより、ストリーミングアプリケーションはデータの送信と応答が可能になります。 メッセージングシステムとしてのKafka メッセージング システムとしての Kafka には、次の 3 つの基本コンポーネントがあります。
大規模システムでは、多くのサブシステムとやり取りし、メッセージを渡す必要があります。このようなシステムには、ソースシステム(メッセージの送信者)と宛先システム(メッセージの受信者)が存在します。 このようなメッセージング システムでデータを送信するには、適切なデータ パイプラインが必要です。 このようなデータのやり取りは非常に複雑に見えます。メッセージングシステムを使用すれば、システムははるかにシンプルでクリーンになります。 Kafka は、1 つ以上のデータ センター内のサーバー上でクラスターとして実行されます。
コアAPI Kafka には次の 4 つのコア API があります。
Kafka の基本概念 非常にスケーラブルでフォールトトレラントなメッセージングシステムであるKafkaには、多くの基本的な概念があります。Kafka特有の概念について学びましょう。 トピック トピックはトピックと呼ばれます。Kafkaでは、カテゴリ属性はメッセージを異なるクラスに分類するために使用され、このクラスはトピックと呼ばれます。 トピックはメッセージの割り当てラベルのような論理的な概念です。トピックは、データベース内のテーブルやファイルシステム内のフォルダに似ています。 パーティション パーティション(partition)とは、トピック内のメッセージを1つ以上のパーティションに分割することを指します。これは物理的な概念であり、システム内の1つ以上のディレクトリに対応します。各パーティションはコミットログです。メッセージは追加専用形式でパーティションに書き込まれ、順次読み込まれます。 注: トピックには多数のパーティションが含まれているため、トピック全体で順序は保証されませんが、単一のパーティション内では順序が保証されます。メッセージは各パーティションの末尾に強制的に追加されます。Kafka はパーティション分割によってデータの冗長性とスケーラビリティを実現します。 パーティションは異なるサーバーに分散できるため、トピックを複数のサーバーにまたがって配置し、単一のサーバーよりも強力なパフォーマンスを実現できます。 セグメント セグメントはセグメントとして翻訳され、パーティションをさらに複数のセグメントに分割し、各セグメント ファイルは同じサイズになります。 ブローカ Kafka クラスターは 1 つ以上のサーバーで構成され、各サーバーはブローカーと呼ばれます。ブローカーはプロデューサーからメッセージを受信し、メッセージのオフセットを設定し、メッセージをディスクにコミットします。 ブローカーは、パーティションの読み取り要求に応答し、パーティションがディスクにコミットされたことを示すメッセージを返すことで、コンシューマーにサービスを提供します。 ブローカーはクラスタのコンポーネントです。各クラスタには、クラスタコントローラ(リーダー)としても機能するブローカーが1つ存在し、クラスタのアクティブなメンバーによって選出されます。 クラスター内の各メンバーは、潜在的にリーダーとして機能することができます。リーダーは、ブローカーへのパーティションの割り当てやブローカーの監視などの管理タスクを担当します。 クラスターでは、パーティションはリーダーに属しますが、パーティションは複数のブローカー (リーダー以外) に割り当てることができ、その場合、パーティションのレプリケーションが発生します。 このレプリケーション メカニズムは、パーティションにメッセージの冗長性を提供します。ブローカーに障害が発生した場合、他のアクティブなユーザーが新しいリーダーを選択して引き継ぎます。 プロデューサー プロデューサー、つまりメッセージ パブリッシャーは、特定のトピックから対応するパーティションにメッセージを公開します。 デフォルトでは、プロデューサーは特定のメッセージがどのパーティションに書き込まれるかに関係なく、トピック内のすべてのパーティションにメッセージを均等に分散します。ただし、場合によっては、プロデューサーが指定されたパーティションに直接メッセージを書き込むこともあります。 消費者 コンシューマーはメッセージのユーザーです。コンシューマーは複数のトピックからメッセージを消費できますが、特定のトピックについては、同じパーティションからのメッセージのみを消費します。 Kafka の基本的な概念を理解した後、Kafka クラスターを構築して Kafka をさらに詳しく調べます。 設置環境の確保 Java環境をインストールする Kafka をインストールする前に、Linux システムに Java 環境がインストールされていることを確認してください。Java のバージョンを確認するには、`java -version` コマンドを使用してください。JDK 1.8 を推奨します。 Java 環境がインストールされていない場合は、この記事に従ってインストールしてください。
Zookeeper環境をインストールする Kafka はメタデータを保存し、一貫性を確保するために Zookeeper を使用するため、Kafka をインストールする前に Zookeeper をインストールする必要があります。Kafka ディストリビューションには Zookeeper が付属しており、スクリプトを使用して直接起動できますが、Zookeeper のインストールは難しくありません。 Zookeeper スタンドアロンセットアップ 単一の Zookeeper インスタンスの設定は比較的簡単です。公式 Web サイトから Zookeeper の安定バージョンをダウンロードするだけです。
ここではバージョン3.4.10を使用しています。ダウンロード後、Linuxシステムの/usr/localディレクトリにZookeeperフォルダを作成してください。 次に、xftp ツール (公式 Web サイト https://www.netsarang.com/zh/xshell/ で xftp ツールと xshell ツールの両方の無料のホーム バージョンを申請できます) を使用して、ダウンロードした Zookeeper の圧縮パッケージを /usr/local/zookeeper ディレクトリに配置します。 tar.gz パッケージをダウンロードした場合は、単に `tar -zxvf zookeeper-3.4.10.tar.gz` を使用して抽出できます。 zipパッケージをダウンロードした場合は、Linuxに解凍ツールがインストールされているかどうかも確認する必要があります。インストールされていない場合は、「yum install unzip」コマンドでzip解凍ツールをインストールし、「unzip zookeeper-3.4.10.zip」コマンドで解凍してください。 解凍後、/usr/local/zookeeper/zookeeper-3.4.10 に cd してデータ フォルダーを作成し、conf フォルダーに入って、mv zoo_sample.cfg zoo.cfg を使用して名前を変更します。 次に、vi で zoo.cfg を開き、dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data に変更して保存します。 binディレクトリに移動し、コマンド「./zkServer.sh start」を入力してサービスを開始します。以下の出力が表示されれば、セットアップは成功です。 サービスを停止するには、次のコマンドを入力します: `./zkServer.sh stop` ステータス情報を表示するには、`./zkServer.sh status` を使用できます。 Zookeeper クラスターのセットアップ ①準備条件 3台のサーバーが必要です。ここではCentOS7を使用し、3台の仮想マシンをインストールしました。各仮想マシンに1GBのメモリを割り当て、各マシンの/usr/local/以下にZookeeperフォルダを作成しました。 Zookeeperの圧縮ファイルを移動して解凍すると、zookeeper-3.4.10フォルダが作成されます。このフォルダに入り、dataとlogという2つの新しいフォルダを作成します。 注: 前のセクションで説明した単一マシンのセットアップでデータフォルダが既に作成されているため、再度作成する必要はありません。ログフォルダのみ作成してください。これらの2つのフォルダは、新たに追加する他の2つのサービス用に作成する必要があります。 ② クラスターを構成する 新しいファイルを作成したら、conf/zoo.cfgファイルを編集する必要があります。3つのファイルの内容は次のとおりです。
server.1 の 1 はサーバー識別子を表します。サーバー番号を示す任意の数字を指定することもできます。この識別子は、以下で設定する myid の識別子と一致する必要があります。 192.168.1.7:12888:13888 はクラスタ内の IP アドレスです。最初のポートはマスターとスレーブ間の通信インターフェースを表し、デフォルトでは 2888 です。 2番目のポートはリーダー選出用のポートです。これは、クラスターがリーダー選出のために起動するとき、または前のリーダーが失敗した後に新しいリーダーが選出されるときに使用されます。デフォルトのポートは3888です。 それでは、上記の設定ファイルについて説明しましょう。
Zookeeperサーバーが5ハートビート(tickTime)以上経過してもクライアントからの応答を受信しない場合、クライアント接続が失敗したことを示します。合計時間は5 * 2000 = 10秒です。
③ myidファイルを作成する 設定ファイルの内容を理解した上で、各クラスタノードに `myid` を作成しましょう。前述の通り、この `myid` は `server.1` の `1` です。同様に、クラスタ内の各サービスにも識別子を指定する必要があります。これは `echo` コマンドを使用して作成します。
④ サービスを起動してテストします。 設定が完了しました。各ZKサービスを起動してテストしてください。私のWindowsコンピューターでのテスト結果は次のとおりです。 サービスを開始します (各マシンで実行する必要があります)。
サービスのステータスを確認するには、コマンド `./zkServer.sh status` を使用します。 192.168.1.7 --- フォロワー: 192.168.1.8 --- リーダー: 192.168.1.9 --- フォロワー: Zookeeperクラスタは通常、リーダー1台とフォロワー複数台で構成されます。リーダーは通常、クライアントからの読み取りおよび書き込み要求に応答し、フォロワーはリーダーとデータを同期します。リーダーに障害が発生した場合、フォロワーの中から新しいリーダーが選出されます。 Kafka クラスターのセットアップ 準備条件 準備条件は次のとおりです。
/usr/local の下に Kafka という名前の新しいフォルダーを作成し、ダウンロードした tar.gz パッケージを /usr/local/kafka ディレクトリに移動し、tar -zxvf を使用して解凍します。 解凍後、kafka_2.12-2.3.0 ディレクトリに移動し、log という名前の新しいフォルダーを作成して、config ディレクトリに移動します。 多くのプロパティ構成ファイルがあることがわかりますが、ここでは server.properties ファイルのみに注目する必要があります。 Kafka を起動するには 2 つの方法があります。
② 設定項目を変更する 各サービスの設定項目、特にserver.propertiesを変更する必要があります。以下の内容を更新または追加する必要があります。
設定項目の意味:
③ Kafka クラスターを起動してテストします。 サービスを開始するには、/usr/local/kafka/kafka_2.12-2.3.0/bin ディレクトリに移動します。
サービスが実行されているかどうかを確認します。
Kafkaはすでに実行されています。トピックを作成して、正常に作成されたことを確認してください。
上記の説明に関して:
テーマが正常に作成されたかどうかを確認します。
サービスを開始するだけで、クラスターを起動して実行できるようになります。 単一のマシン上にパブリッシャーを作成します。
サーバー上にサブスクライバーを作成します。
注意: ここで `--zookeeper` を使用すると、「zookeeper は認識されたオプションではありません」というエラーが発生する可能性があります。これは Kafka のバージョンが高すぎるためであり、代わりに `--bootstrap-server` ディレクティブを使用する必要があります。 テスト結果は次のとおりです。 リリース: 消費: ④ その他のコマンド トピックを表示:
トピックのステータスを確認します:
リーダーは、特定のパーティション上のすべての読み取りおよび書き込み操作を担当し、各ノードはランダムに選択されてリーダーになります。 レプリカは、リーダーであるか現在アクティブであるかに関係なく、このパーティションのログを複製するノードのリストです。 Isr は同期レプリカのコレクションです。これは、現在アクティブでリーダーに追従するレプリカリストのサブセットです。これで、Kafka クラスターのセットアップは完了です。 ⑤ マルチノードデータ受信の検証 ここ数日間、同じIPアドレスを使用してきました。今度は、他のクラスターのノードを使用して、サービスが受信できるかどうかを確認してみましょう。 他の 2 つのノードでも使用します。
次に、ブローカーを使用してメッセージを送信し、テストの結果、3 つのノードすべてがメッセージを受信できることが示されました。 構成の詳細 Kafka のセットアップ時に、server.properties の設定の意味について簡単に説明しました。ここでは、パラメータの設定と概念について詳しく説明します。 標準構成 これらのパラメータは、Kafka の最も基本的な構成です。 broker.id: 各ブローカーには、broker.id で表される識別子が必要です。デフォルト値は 0 ですが、任意の整数に設定できます。クラスターでは、各ノードの broker.id が一意であることが不可欠です。 port: 構成サンプルを使用して Kafka を起動すると、ポート 9092 でリッスンします。ポート構成パラメータを他の使用可能なポートに変更できます。 zookeeper.connect: ブローカー メタデータを保存するために使用されるアドレスは、zookeeper.connect を通じて指定されます。 localhost:2181: これは、プログラムがローカル マシンのポート 2181 で実行されていることを示します。この構成パラメーターは、ホスト名:ポート/パス パラメーターのコンマ区切りのリストです。 各部分の意味は次のとおりです。
log.dirs: Kafka はメッセージをディスクに保存します。これらのログフラグメントを保存するディレクトリは、log.dirs で指定します。これは、カンマで区切られたローカルファイルシステムのパスのセットです。 複数のパスを指定した場合、ブローカーは「最も使用頻度の低い」原則に従って、同じパーティションからのログフラグメントを同じパスに保存します。 ブローカーは、ディスク容量が最も少ないパスではなく、パーティション数が最も少ないパスにパーティションを追加することに注意してください。 `num.recovery.threads.per.data.dir`: Kafka は、次の 3 つの場合に、設定可能なスレッド プールを使用してログ フラグメントを処理します。
デフォルトでは、各ログディレクトリは1つのスレッドのみを使用します。これらのスレッドはサーバーの起動とシャットダウン時にのみ使用されるため、多数のスレッドを設定して並列処理を実現することも可能です。 特に、多数のパーティションを持つサーバーの場合、リカバリ中に適切に実行される操作を使用すると、クラッシュが発生した場合に数時間を節約できます。 このパラメータを設定するときは、設定された番号が log.dirs で指定された単一のログ ディレクトリに対応することに注意することが重要です。 つまり、num.recovery.threads.per.data.dir が 8 に設定され、log.dir が 3 つのパスを指定する場合、合計 24 個のスレッドが必要になります。 `auto.create.topics.enable`: デフォルトでは、Kafka は次の 3 つの状況でトピックを作成します。
delete.topic.enable: トピックを削除する場合は、トピック管理ツールを使用できます。 デフォルトでは、トピックの削除は許可されていません。delete.topic.enable のデフォルト値は false であるため、トピックを任意に削除することはできません。 これは本番環境では適切な保護対策ですが、開発環境やテスト環境ではテーマの削除が許可されています。そのため、テーマを削除する場合は、delete.topic.enable を true に設定する必要があります。 テーマのデフォルト設定 Kafka は、新しく作成されたトピックに対して多くのデフォルト設定パラメータを提供しています。これらのパラメータを一緒に見ていきましょう。 num.partitions: num.partitions パラメータは、新しく作成されたトピックに含めるパーティションの数を指定します。 自動テーマ作成機能が有効になっている場合(デフォルトで有効になっています)、テーマパーティションの数は、このパラメータで指定された値になります。このパラメータのデフォルト値は1です。テーマパーティションの数を増やすことはできますが、減らすことはできませんのでご注意ください。 default.replication.factor: このパラメーターは非常にシンプルで、Kafka が保存するメッセージのレプリカの数を表します。 1つのレプリカに障害が発生した場合でも、別のレプリカがサービスを継続できます。default.replication.factor のデフォルト値は1です。このパラメータは、テーマの自動作成機能を有効にした後に有効になります。 `log.retention.ms`: Kafka は通常、時間に基づいてデータの保持期間を決定します。デフォルトの時間は `log.retention.hours` パラメータで設定され、168 時間、つまり 1 週間です。 これらに加えて、log.retention.minutes と log.retention.ms という2つのパラメータがあります。これら3つのパラメータは、メッセージを削除するまでの保存期間を決定するという同じ機能を持ちます。log.retention.ms の使用をお勧めします。 log.retention.bytes: メッセージを保持するもう1つの方法は、メッセージの有効期限が切れているかどうかを判断することです。この値はパラメータlog.retention.bytesで指定され、各パーティションに適用されます。 つまり、8 つのパーティションを持つトピックがあり、log.retention.bytes が 1 GB に設定されている場合、このトピックは最大 8 GB のデータを保持できます。 したがって、トピック内のパーティションの数が増えると、トピック全体で保持できるデータの量も増加します。 log.segment.bytes: 上記のログは、個々のメッセージではなく、ログ セグメントに適用されます。 メッセージがブローカーに到着すると、パーティションの現在のログセグメントに追加されます。ログセグメントのサイズがlog.segment.bytesで指定された上限(デフォルトは1GB)に達すると、現在のログセグメントは閉じられ、新しいログセグメントが開かれます。 ログセグメントが閉じられると、有効期限の待機状態になります。このパラメータの値が小さいほど、新しいファイルが閉じられ、割り当てられる頻度が高くなり、ディスク書き込みの全体的な効率が低下します。 log.segment.ms: 前述の通り、ログセグメントは閉じられた後、有効期限が切れるまで待機する必要があります。log.segment.msパラメータは、ログセグメントが閉じられた状態を維持する時間を指定します。 log.segment.ms と log.retention.bytes の間には排他性の問題はありません。ログセグメントは、サイズ制限または時間制限のいずれかが満たされた時点で閉じられます。 message.max.bytes: ブローカーは、message.max.bytesパラメータを設定することで、単一メッセージのサイズを制限します。デフォルト値は1,000,000(1MB)です。 プロデューサーがこのサイズを超えるメッセージを送信しようとすると、メッセージは受信されないだけでなく、ブローカーによってエラー メッセージも返されます。 他のバイト関連の設定パラメータと同様に、このパラメータは圧縮されたメッセージのサイズを表します。つまり、圧縮されたメッセージがmessage.max.bytesより小さければ、実際のメッセージサイズはこの値より大きくても構いません。 この値はパフォーマンスに大きな影響を与えます。値が高いほど、ネットワーク接続とリクエストを処理するスレッドの処理時間が長くなります。また、ディスク書き込みブロックのサイズも大きくなり、I/Oスループットに影響します。 記事参照:
[編集者のおすすめ]
|