DUICUO

Kafka は毎日何千億ものログをどうやって処理するのでしょうか?

以前、Kafkaの原理に関する実用的な情報を多数ご紹介しました。本日は、360がKafkaを用いて数千億ものデータポイントを処理する実践例を詳しく見ていきましょう。

[[286310]]

Pexelsからの画像

この記事は主に以下の内容を共有します。

  • メッセージキューの選択
  • 360におけるKafkaの商業化の現状
  • Kafka クライアント フレームワーク
  • 高いデータ可用性
  • 負荷分散
  • 認証、認可、ACLスキーム
  • クォータメカニズム
  • IDC間のデータ同期
  • アラームの監視
  • オンラインの問題と解決策

メッセージキューの選択

当時は主に以下の点が考慮されていました。

  • コミュニティ活動
  • クライアントサポート
  • スループット

いくつかのシステムを比較した結果、Kafkaが私たちの要件に最も合致すると感じています。また、Pulsarという新しいオープンソースシステムも試してみてもよいかもしれません。

Kafka の主な設計特徴は次のとおりです。

Kafkaは高いパフォーマンスとスループットを誇ります。SendfileとPagecacheによるZero Copyメカニズムを実装し、シーケンシャルな読み書き特性により、一般的なディスクでも高いスループットを実現できるため、比較的コスト効率に優れています。

Kafka はレプリカと ISR メカニズムを使用して、高いデータ可用性を確保します。

Kafka クラスターには 2 つの管理ロールがあります。

  • コントローラーは主にクラスターの管理を担当します。
  • コーディネーターは主にビジネスレベルのプロセスを管理します。

両方の役割は Kafka のブローカーによって担われるため、フェイルオーバーは簡単になり、置き換えるブローカーを選択するだけで済みます。

この観点から見ると、Kafka は分散型の設計哲学を持っていますが、コントローラー自体も Hadoop の Namenode に匹敵するボトルネックになっています。

ほとんどの人は、分散システムは CP または AP のいずれかを使用して実装できると述べている CAP 定理をよく知っています。

Kafka は柔軟性を提供し、さまざまな企業がそれぞれのビジネス特性に基づいて、コンテンツ作成 (CP) またはアプリケーション処理 (AP) に重点を置いたトピック レベルを構成できます。

ビジネスユニット間の独立した繰り返しの消費をサポートし、再生を可能にします。

これは Kafka の簡略化されたアーキテクチャであり、主に次のように分かれています。

  • 生産側
  • ブローカー側
  • 消費者側

ログには 3 つのレベルがあります。

  • 第1レベルのトピック
  • 2 番目のレベルはパーティションです (各パーティションは並列度を表します)。
  • 3 番目のレベルはレプリカです (レプリカはパーティションのレプリカの数を表します)。

360におけるKafkaの商業化の現状

クラスターは現在、数千億のデータ ポイントを処理し、100 台を超えるギガビット マシンを備え、単一トピックの最大ピークは 600,000 QPS、ピーク クラスター速度は約 500 万 QPS です。

物理マシンは、24コア、10Gbネットワークカード、128GBのRAM、4TB×12台のHDDで構成されています。なお、10Gbネットワークカードと4TB×12台の通常のハードドライブを使用していますが、テストではディスクスループットとネットワークスループットがほぼ同等でした。

さらに、データ量が大きいことを考慮すると、SSD は特に大きくなく、比較的高価です。

ディスク構成には JBOD を使用していますが、RAID10 も良い選択肢です (ただし、ディスク コストは 2 倍になります)。

現在のKafkaバージョンは1.1.1ですが、バージョン0.11以降をデプロイすることをお勧めします。このバージョンではプロトコルに多くの最適化が施されており、後続の2.xバージョンと互換性があります。

これらは、Kafka のアップストリームおよびダウンストリームに関連するコンポーネントです。本番環境では、主に各種 Kafka クライアント、リアルタイムサービス、Flume、Logstash が含まれます。

消費者側は、リアルタイム、オフライン(ETL)、モニタリングの3つの部分に分かれています。リアルタイム処理には、Spark、Flink、Stormといった主流のフレームワークが存在します。オフライン処理については、FlinkをベースにしたHamalという統合ランディングフレームワークを開発しました。Kafkaから一度データをコンシュームすれば、その後複数のダウンストリームシステム(HDFS、HBase、Redisなど)にランディングできるため、重複したコンシュームを回避できます。

もう一つの要件は監視です。以前はES/InfluxDB関連のログをKafkaにプッシュし、Grafanaで表示していましたが、現在はPrometheusに切り替えています。

Kafka クライアント フレームワーク

なぜこのフレームワークを構築するのでしょうか? 以前は、多くのビジネス部門が、ベア API を使用して Kafka クライアント ロジックを独自に実装していました。

しかし、多くの問題が存在します。一部の例外は完全にはキャッチされません。このフレームワークの目的は、すべての詳細を隠し、十分にシンプルなインターフェースを公開することです。

これにより、ビジネスエラーの可能性が低減されます。ネットワークやクラスタの異常といった極端な状況においても可用性を確保する必要があります。ネットワークまたはクラスタが利用できない場合、データはまずローカルに保存され、復元時にローカルディスクからKafkaに復元されます。

私たちは 2 つのフレームワークを実装しました。

  • LogProducer は少なくとも 1 回はサポートします。
  • LogConsumer は、「少なくとも1回」と「正確に1回」の両方のセマンティクスをサポートします。「正確に1回」のセマンティクスでは、ビジネスロジックに Rollback インターフェースを実装する必要があります。

LogProducer フレームワークの基本的な考え方は、メモリ内キューを介してログを Kafka に送信することです。Kafka またはネットワークが利用できない場合、ログはローカルディスクに書き込まれます。同時に、スレッドが Kafka またはネットワークの可用性をリアルタイムで監視します。ネットワークが回復すると、ログはディスクから読み込まれ、Kafka に送信されます。

また、再起動中に失われるログの数を減らすために使用される、メモリの代わりに共有メモリ戦略もサポートしています。

LogConsumer フレームワークの実装では、実際には消費ロジックは単純ですが、処理ロジックは非常に複雑であるため、ブロッキング キューを介してコンシューマー スレッドとワーカー スレッドを分離します。

これにより、コンシューマー スレッドとワーカー スレッドに異なる構成が可能になり、ブロッキング キューを介したバックプレッシャー メカニズムも有効になります。

たとえば、ワーカーがワークロードを処理できない場合、ブロッキング キューがいっぱいになり、コンシューマー スレッドにプッシュバックされて、データの消費が停止します。

一方、ワーカー スレッド インターフェイスにインターフェイスを提供し、ユーザーがグローバル オフセット マップに送信できるようにします。

上図に示すように、3つの複合インターフェースを提供しています。ビジネスサイドのロールバックロジックがビジネス処理とコミットに実装されている場合、セマンティクスは「正確に1回」となりますが、デフォルトでは「少なくとも1回」となります。

高いデータ可用性

以前、Kafka がレプリカ + ISR メカニズムを提供して高いデータ可用性を確保する方法について説明しましたが、それだけでは十分ではないと感じたため、Rack Aware もサポートする必要があります。

例えば、Replica=3 の場合、3 つのレプリカが異なる物理ラックに配置されます。これにより、最大 2 つの物理ラックが同時に故障しても、データの可用性を維持できます。Rack Aware ソリューションは、後ほど詳しく説明する負荷分散ソリューションと併せて実装されます。

Kafka は Rack Aware を公式にサポートしており、これはブローカー側で broker.rack パラメータを設定することで実現できることは注目に値します。

ただし、制限があります。各ラックには必ず同じ数のブローカーを割り当てる必要があります。そうしないと、レプリカ割り当ての偏りが生じます。実際には、IDCには多数のラックがあり、割り当てられた物理マシンの分布は非常にランダムになる可能性があります。

考えられる解決策の 1 つは、仮想ラック グループの概念を使用することです。たとえば、3 つの仮想ラック グループを維持し、取得した物理マシンをこれらの 3 つのグループに追加し、各ラック グループに割り当てられた物理マシンの数に一貫性があることを確認します。

もちろん、異なるラック グループ内の物理マシンが同じ物理ラックを持つ状況は発生しません。

負荷分散

Kafka の負荷分散機能は、Confluent の商用版でのみサポートされています。負荷分散とは、本質的にはレプリカを均等に分散させる問題です。

当初、私たちは、下の図に示すように、従来の一貫性ハッシュ法を使用してこの問題を解決したいと考えていました。

その後、従来のワンタイムハッシュではニーズを満たせないことがわかりました。例えば、ノード5を追加しても、ノード2の負荷の一部しか分散できず、グローバルな負荷分散は実行できませんでした。

そのため、図に示すように、仮想ノードに対してワンタイム ハッシュ アルゴリズムに基づくスキームを実装しました。同じ色は同じ物理マシンに対応し、ハッシュ リング上のすべてのノードは仮想ノードです。

ここには4つの物理ノードがあり、node4は新しく追加されたノードです。仮想ノードは物理ノードの負荷を十分に均等に分散できるため、node4をハッシュリングに追加すると、すべての物理マシンの負荷が分散されました。

アルゴリズムの実装は、主に次の 2 つのステップで構成されます。

① 新しいハッシュ サークルを作成します。vnode_str (例: hostname-v0) に対して MD5 ハッシュを実行して仮想ノードの vnode_key を取得し、リング ディクショナリを使用して仮想ノードから物理ノードへのマッピングを保存し、vnode_key を sorted_keys のリストに追加します。

②ハッシュリングにレプリカを分散する:(topic_name + partition_num + replica_num)をキーとして、同じMD5ハッシュアルゴリズムを使用してreplica_keyを取得します。

次に、二分探索を実行して、sorted_keys 内の replica_key の位置を見つけます。最後に、リング辞書を使用して物理マシンノードにマッピングし、レプリカの割り当てが完了します。

このアルゴリズムに基づいて、次の 3 つの問題を解決します。

  • 物理ノードを追加する場合は、データのごく一部を移行するだけで済みます。
  • 異なる構成の物理マシンに重みを設定することで、異機種クラスタの展開をサポートできます。
  • レプリカにラックアウェアを実装するために、ラック情報は物理ノードに保存されます。物理ノードをレプリカに割り当てる際に、割り当てられたラック情報が記録されます。

重複がある場合、次の物理ノードを見つけるために、Position で vnode_key が1ずつ増加します。3つのレプリカの物理ラックが異なることを確認します(例:Replica=3)。

リーダーバランス:これは高速かつ低コストの負荷分散手法です。Kafkaはリーダーを介してのみ読み取りと書き込みサービスを提供するため、リーダーを切り替えることで負荷分散を実現できます。リーダーの切り替えのみではデータの同期は行われないため、コストは比較的小さくなります。

ディスクの再バランス: この機能は、Kafka バージョン 1.1.0 以降でのみサポートされています。Kafka は、バランス調整操作を実行するためのスクリプトと API をいくつか提供しています。これらの操作は、基本的にレプリカプランを生成してから再割り当てを実行します。

認証、認可、ACLスキーム

新しいクラスターの場合、実装が比較的簡単な SASL ベースの SCRAM ソリューションが推奨されます。

古いクラスタの途中に認証・認可メカニズムを実装するのは困難です。各事業部に設定変更を強制する必要があり、切り替えプロセスにも問題が生じやすくなります。

以下では、古いクラスタの問題に対処するために実装したホワイトリストメカニズムについて説明します。まず、古いサービスをホワイトリストに追加します。次に、新しいサービスが作業指示プロセスを通じてトピックとコンシューマのリソース権限を申請し、ホワイトリストに追加できるようにします。不正なトピックとコンシューマのリソース(作業指示プロセスを経ていないリソース)がないか定期的に監視します。

同時に、これらのリソースは削除され、トピックとコンシューマーの読み取りおよび書き込み権限が厳しくなりますが、元のビジネスには影響しません。

クォータメカニズム

クォータは主に、複数の業務オペレーション間のリソース競合問題を解決するために使用されます。クォータには以下の2種類があります。

  • 1 つのアプローチは、ネットワーク帯域幅を制限することです。
  • 1 つの方法は、リクエスト レートを制限する (CPU を制限する) ことです。

サービスには、高、中、低の3つの優先度レベルを設定しています。高優先度サービスは制限なし、中優先度サービスは遅延を許容、低優先度サービスは極端なケースでは停止可能です。ツールを使用して、特定の優先度のすべてのサービスを一括で制限することで、高優先度サービスとクラスターのセキュリティを確保できます。

IDC間のデータ同期

まず、なぜIDC間のデータ同期を実行する必要があるのでしょうか?同期が行われる前は、企業にはデータの読み書きにIDCという概念がないため、IDC間の読み書き操作が発生する可能性が高く、複数の企業でConsume(消費)操作とProduce(生産)操作が重複している可能性があります。

その結果、IDC間ネットワークに大きな無駄が生じます。さらに、IDC間ネットワークは不安定で、時折異常が発生し、業務を円滑に進められない可能性があります。

これらの課題に対処するため、IDC間データ同期サービスを統合的に導入しました。まず、企業は自社IDC内でのみ読み書き操作を実行でき、IDC間での読み書き操作は許可されないよう規定しました。IDC間のデータが必要な場合は、当社に申請いただければ、Mirrormakerを通じてコピーが同期されます。

これを行うと、2 つの利点があります。

  • まず、異常事態の影響からビジネスを保護します。
  • 第二に、IDC 間の帯域幅を節約します (同期メカニズムを通じてこのデータが 1 回だけ送信されるようにすることができます)。

また、このサービスを Marathon/Mesos に基づいてパスベースにすることで、サービスの SLA が向上しました。

アラームの監視

当社の監視およびアラート プラットフォームは次のとおりです。

  • チャートの作成には、Jmxエクスポーター、Prometheus、Grafanaを使用します。Jmxエクスポーターは各ブローカーにデプロイされ、Prometheusがデータを取得し、Grafanaが結果を表示します。
  • Kafka Manager を使用して一時的なメトリックを監視します。
  • Burrow に基づいてコンシューマー ラグを監視します。
  • アラートは、Zabbix と同様に 360 によって内部的に実装されたコンポーネントである Wonder に基づいています。

オンラインの問題と解決策


ディスク障害:Smartctlを使用して監視します。まず、ステータスがPassedである必要があります。次に、Current_Pending_Sector属性(197)の値が100を超えていないことを確認します。100を超える場合、ディスクの読み取り/書き込みパフォーマンスに問題がある可能性があります。

`bootstrap.servers` パラメータはパフォーマンスのボトルネックとなります。このパラメータを使用すると、Kafka クライアントに検索サービスを提供するプロキシとして機能する複数のブローカーを設定できます。

クラスターが大きく、クライアント数が多い場合、プロキシロールのブローカーの負荷は非常に高くなります。この問題を解決するために、bootstrap.serversパラメータにVIPを設定しました。

各 VIP は任意の数のブローカーにバインドできるため、クライアント構成を変更することなくプロキシを動的にスケールアップまたはスケールダウンできます。

消費者の再起動後も消費されない:ビジネスからのフィードバックによると、消費が停止しており、再起動しても問題は解決しませんでした。さらに調査したところ、0.11より前のバージョンにバグがあることが判明しました。

  1. https://issues.apache.org/jira/browse/KAFKA-5413

原因は、ログクリーナースレッドがクラッシュし、Compactが停止したことです。__consumer_offsetsトピックのボリュームが非常に大きく、ブローカーの再読み込み時間が特に長く、その間サービスが停止していました。

解決策は 2 つあります。

  • まず、Kafka バージョン 0.11 以上にアップグレードします。
  • 2 番目の解決策は、問題のあるコーディネーターをバイパスして、オフセットを新しいコンシューマー グループに移行することです。

ヤン・スペン

[[286313]]

Yan Suopengは、Qihoo 360のビッグデータアーキテクチャと運用の専門家であり、インフラとビッグデータ開発の分野で10年の経験を有しています。2013年に360の商用化チームに加わり、メッセージミドルウェアの開発・運用に加え、ビッグデータアーキテクチャ、マイクロサービスアーキテクチャ、リアルタイムコンピューティングプラットフォーム、機械学習プラットフォーム、監視システムなどのインフラ構築を担当しました。商用化チームに安定的かつ効率的な基本サービスを提供することに尽力しています。