DUICUO

Apache Kafka: 「シームレスなシステム」のための非同期メッセージングサポートの提供

電子商取引プラットフォームが、遅延なく膨大なトラフィックを処理できる仕組みを不思議に思ったことはありませんか?あるいは、OTTプラットフォームが何百万人ものユーザーに同時にコンテンツを配信できる仕組みを不思議に思ったことはありませんか?その鍵は、分散型アーキテクチャにあります。

分散アーキテクチャで設計されたシステムは、複数の機能コンポーネントで構成されます。これらのコンポーネントは通常、複数のマシンに分散され、ネットワークを介して非同期的にメッセージを交換することで連携します。非同期メッセージの存在こそが、コンポーネント間でスケーラブルかつノンブロッキングな通信を実現し、システム全体のスムーズな動作を可能にするのです。

非同期メッセージ

非同期メッセージの一般的な特性は次のとおりです。

  • メッセージのプロデューサーとコンシューマーは互いの存在を認識していません。相手が自分の存在を認識していることを知らずに、システムに参加したり離脱したりします。
  • メッセージ ブローカーは、プロデューサーとコンシューマーの間の仲介役として機能します。
  • プロデューサーは各メッセージを「トピック」に関連付けます。トピックは単純な文字列です。
  • プロデューサーは複数のトピックでメッセージを送信することができ、異なるプロデューサーが同じトピックでメッセージを送信することもできます。
  • コンシューマーは、エージェントからの 1 つ以上のトピックのメッセージをサブスクライブします。
  • プロデューサーは、コンシューマーではなくブローカーにのみメッセージを送信します。
  • エージェントは、トピックをサブスクライブしているすべてのコンシューマーにメッセージを送信します。
  • エージェントは、そのトピックに登録されているすべてのコンシューマーにメッセージを配信します。
  • プロデューサーはコンシューマーからのレスポンスを期待しません。つまり、プロデューサーとコンシューマーは互いにブロックしません。

市場には多くのメッセージブローカーがありますが、Apache Kafka は最も人気のあるものの 1 つです。

アパッチカフカ

Apache Kafka は、ストリーミング処理をサポートするオープンソースの分散メッセージングシステムであり、Apache Software Foundation によって開発されました。アーキテクチャ的には、Apache ZooKeeper サービスを介して相互に連携する複数のブローカーのクラスターで構成されています。これらのブローカーは、メッセージの受信、永続化、送信時にクラスター全体に負荷を分散します。

パーティション

Kafka は「パーティション」と呼ばれるバケットにメッセージを書き込みます。特定のパーティションは、1 つのトピックからのメッセージのみを保持します。例えば、Kafka は​heartbeats​というトピックからのメッセージを​heartbeats-0​というパーティションに書き込む場合があります(単一パーティションのトピックの場合)。このプロセスはプロデューサーとは独立しています。

図1: 非同期メッセージ

ただし、Kafka クラスターが提供する並列処理機能を活用するために、管理者は通常、特定のトピックに対して複数のパーティションを作成します。例えば、管理者がトピック​heartbeats​に対して3つのパーティションを作成したとします。Kafka はそれぞれ​heartbeats-0​​heartbeats-1​​heartbeats-2​という名前を付けます。Kafka は、メッセージをこれらの3つのパーティションに均等に分散させます。

プロデューサーが各メッセージをメッセージキーに関連付けるというシナリオも考えられます。例えば、両方のコンポーネントが​heartbeats​トピックでメッセージを送信する場合、一方のコンポーネントはメッセージキーとして​C1​を使用し、もう一方のコンポーネントは​C2​使用する可能性があります。この場合、Kafka は、トピック内で同じメッセージキーを持つメッセージは常に同じパーティションに書き込まれるようにします。ただし、パーティション内では、メッセージのメッセージキーが同じではない場合があります。下の図 2 は、異なるパーティションにメッセージが分散される例を示しています。

図2: 異なるパーティション間のメッセージの分散

リーダーと同期レプリカ

Kafka は、複数のブローカーで構成されるクラスター内で複数のパーティションを管理します。パーティションの管理を担当するブローカーは「リーダー」と呼ばれます。リーダーのみが、そのパーティション上でメッセージを送受信できます。

しかし、パーティションのリーダーに障害が発生した場合はどうなるでしょうか?ビジネス継続性を確保するため、各リーダー(エージェント)は自身のパーティションを他のエージェントに複製します。これらの他のエージェントは、そのパーティションの同期レプリカ(ISR)と呼ばれます。パーティションのリーダーに障害が発生すると、ZooKeeper は選出を開始し、選択された同期レプリカを新しいリーダーに任命します。新しいリーダーは、そのパーティションのメッセージ送受信タスクを引き継ぎます。管理者は、パーティションが維持する必要がある同期レプリカのサイズを指定できます。

図3: プロデューサーコマンドラインツール

メッセージの永続性

エージェントは各パーティションを指定されたディスクファイルにマッピングすることで、永続性を実現します。デフォルトでは、メッセージはディスク上に1週間保持されます。メッセージがパーティションに書き込まれると、その内容と順序を変更することはできません。管理者は、メッセージの保持期間や圧縮アルゴリズムなどのポリシーを設定できます。

図4: 消費者向けコマンドラインツール

消費者ニュース

他の多くのメッセージングシステムとは異なり、Kafka はコンシューマーにメッセージをプロアクティブに送信しません。代わりに、コンシューマーはトピックをリッスンし、能動的にメッセージを読み取ります。コンシューマーはトピックの複数のパーティションからメッセージを読み取ることができます。また、複数のコンシューマーが同じパーティションからメッセージを読み取ることもできます。Kafka は、同じコンシューマーが同じメッセージを繰り返し読み取らないことを保証します。

Kafkaでは、各コンシューマーにはグループIDが付与されます。同じグループIDを持つコンシューマーはコンシューマーグループを形成します。通常、N個のトピックパーティションからメッセージを読み取るには、管理者はN個のコンシューマーを含むコンシューマーグループを作成します。これにより、グループ内の各コンシューマーは、指定されたパーティションからメッセージを読み取ることができます。グループ内のコンシューマーの数が利用可能なパーティションの数を超える場合、余分なコンシューマーはアイドル状態になります。

Kafka は、いかなる状況下でも、グループ内のコンシューマーの数に関わらず、同じメッセージがそのコンシューマーグループによって一度だけ読み取られることを保証します。このアーキテクチャは、一貫性、高パフォーマンス、高スケーラビリティ、ほぼリアルタイムの配信とメッセージの永続性、そしてメッセージ損失ゼロを実現します。

Kafkaのインストールと実行

理論上は Kafka クラスターは任意の数のブローカーで構成できますが、実稼働環境では通常、ほとんどのクラスターは 3 つまたは 5 つのブローカーで構成されます。

ここでは、実稼働環境に十分な単一エージェント クラスターをセットアップします。

ブラウザで https://kafka.apache.org/downloads にアクセスし、Kafka の最新バージョンをダウンロードしてください。または、Linux ターミナルで以下のコマンドを実行してダウンロードすることもできます。

 https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz を取得します

必要であれば、ダウンロードしたアーカイブファイル​kafka_2.12-2.8.0.tgz​別のディレクトリに移動することもできます。このアーカイブを解凍すると、 ​kafka_2.12-2.8.0​というディレクトリが作成されます。これは、後ほど設定する​KAFKA_HOME​です。

​KAFKA_HOME/config​ディレクトリの​server.properties​ファイルを開き、次の構成行のコメントを解除します。

 リスナー= プレーンテキスト: // : 9092

この設定により、Kafka はローカルマシンのポート​9092​でプレーンテキストメッセージを受信できるようになります。また、本番環境では推奨されるセキュアチャネル経由でメッセージを受信するように Kafka を設定することもできます。

クラスター内のブローカーの数に関わらず、Kafka はブローカーの管理と調整に ZooKeeper を必要とします。これは、ブローカーが 1 つのクラスターの場合でも同様です。ZooKeeper は Kafka と一緒にインストールされるため、 ​KAFKA_HOME​ディレクトリから以下のコマンドを使用してコマンドラインで起動できます。

 ./bin/zookeeper-server-start.sh./config/zookeeper.properties

ZooKeeper が実行されたら、次のコマンドを使用して別のターミナルで Kafka を起動できます。

 ./bin/kafka-server-start.sh./config/server.properties

この時点で、単一プロキシ Kafka クラスターが起動して実行されています。

Kafkaを検証する

​topic-1​でメッセージを送受信してみましょう。トピック作成時にパーティション数を指定するには、以下のコマンドを使用します。

 ./bin/kafka-topics .sh - - 作成- - トピックtopic - 1 - - zookeeper localhost : 2181 - - パーティション3 - - レプリケーション- 係数1

上記のコマンドでは、レプリケーション係数も指定しています。この値はクラスター内のエージェント数を超えることはできません。単一エージェントクラスターを使用しているため、レプリケーション係数は1のみに設定できます。

トピックが作成されると、プロデューサーとコンシューマーはトピック上でメッセージを交換できるようになります。Kafkaディストリビューションには、プロデューサーとコンシューマー向けのテスト用コマンドラインツールが含まれています。

3 番目のターミナルを開き、次のコマンドを実行してプロデューサーを起動します。

 ./bin/kafka-console-producer .sh -- ブローカー- リストlocalhost : 9092 -- トピックtopic - 1

上記のコマンドは、簡単なテキストメッセージを入力できるプロンプトを表示します。指定したコマンドオプションにより、プロデューサーは​topic-1​からローカルマシンのポート9092で実行されているKafkaインスタンスにメッセージを送信します。

4 番目のターミナルを開き、次のコマンドを実行してコンシューマーを起動します。

 ./bin/kafka-console-consumer .sh -- bootstrap - server localhost : 9092 -- topic topic - 1 -- from - beginning

上記のコマンドはコンシューマーを起動し、ローカルマシンのポート9092でKafkaサーバーに接続するように指定します。コンシューマーはトピック​topic-1​をサブスクライブし、そこからメッセージを読み取ります。コマンドラインの最後のオプションにより、このコンシューマーはトピックの先頭からすべてのメッセージを読み取ります。

プロデューサーとコンシューマーは同じエージェントに接続され、同じトピックにアクセスしていることがわかりました。そのため、コンシューマーはメッセージを受信後、端末に出力します。

それでは、実際のアプリケーションシナリオで Kafka を使用してみましょう。

場合

ABCというバス輸送会社があり、全国の複数の都市間を運行するバスを所有しているとします。ABCは、運行品質を向上させるために各バスをリアルタイムで追跡したいと考えており、Apache Kafkaをベースとしたソリューションを提案しています。

まず、ABCはすべてのバスに位置追跡装置を装備しました。次に、Kafkaを使用して、数百台のバスから位置情報の更新を受信するオペレーションセンターを構築しました。また、すべてのバスの現在位置をいつでも表示できるダッシュボードも開発しました。図5はこのアーキテクチャを示しています。

図5: Kafkaベースのアーキテクチャ

このアーキテクチャでは、バス上のデバイスがメッセージプロデューサーとして機能します。デバイスは定期的に現在位置をKafkaトピック​abc-bus-location​に送信します。ABCは、異なるバスからのメッセージを処理するために、バスのトリップコードをメッセージキーとして使用することを選択しました。例えば、ベンガルールからハブバリへ向かうバスのトリップコードは​BLRHL003​となり、そのバスからその走行中に送信されるすべてのメッセージは​BLRHL003​メッセージキーとして使用します。

ダッシュボードアプリケーションはメッセージコンシューマとして機能します。エージェントに同じトピック​abc-bus-location​を登録します。これにより、このトピックはプロデューサー(バス)とコンシューマ(ダッシュボード)間の仮想チャネルとなります。

バスの機器はダッシュボードアプリケーションからの応答を期待していません。実際、バスは互いの存在を認識していません。このアーキテクチャにより、数百台のバスと運行センターの間でノンブロッキング通信が実現されています。

成し遂げる

ABC社が位置情報の更新を維持するために3つのパーティションを作成したいとします。開発環境にはエージェントが1つしかないため、レプリケーション係数は1に設定する必要があります。

したがって、次のコマンドは要件を満たすテーマを作成します。

 ./bin/kafka-topics .sh - - 作成- - トピックabc - バス- 場所- - zookeeper localhost : 2181 - - パーティション3 - - レプリケーション- 係数1

プロデューサーアプリケーションとコンシューマーアプリケーションは、Java、Scala、Python、JavaScriptなど、様々な言語で記述できます。以下のセクションのコード例では、Javaでどのように記述されているかを示し、基本的な理解を深めます。

Javaプロデューサー

以下の​Fleet​クラスは、ABC社の6台のバス上で実行されるKafkaプロデューサーアプリケーションをシミュレートします。指定されたブローカーの​abc-bus-location​トピックに位置情報の更新を送信します。簡潔にするために、トピック名、メッセージキー、メッセージ内容、ブローカーアドレスはすべてコード内にハードコードされていることに注意してください。

 パブリッククラスFleet {
パブリック静的void mainString [] args )は例外をスローします{
文字列ブローカー= "localhost : 9092 " ;
プロパティprops = new Properties ();
props.setProperty ( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG broker ) ;
props.setProperty ( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ,
StringSerializer . class . getName ());
props.setProperty ( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ,
StringSerializer . class . getName ());
プロデューサー< 文字列文字列> producer = new KafkaProducer < 文字列文字列> ( props );
文字列topic = "abc - バス- 場所" ;
Map < String , String > locations = new HashMap <> ();
場所.put ( " BLRHBL001" , " 13.071362,77.461906 " );
場所.put ( "BLRHBL002" , " 14.399654,76.045834 " );
場所.put ( "BLRHBL003" , " 15.183959,75.137622 " ) ;
場所.put ( "BLRHBL004" , " 13.659576,76.944675 " ) ;
場所.put ( "BLRHBL005" , " 12.981337,77.596181 " ) ;
場所.put ( "BLRHBL006" , " 13.024843,77.546983 " );
IntStream . range ( 0 , 10 ). forEach ( i - > {
for ( String trip : locations . keySet ()) {
ProducerRecord < 文字列, 文字列> レコード
= 新しいプロデューサーレコード< 文字列, 文字列> (
トピック旅行場所。get ( 旅行));
producer.send ( レコード) ;
}
});
プロデューサー.フラッシュ( );
プロデューサー.close () ;
}
}
Javaコンシューマー

以下の​Dashboard​クラスは、ABC のオペレーションセンターで実行される Kafka コンシューマアプリケーションを実装しています。このアプリケーションは​abc-bus-location​トピックをリッスンしており、コンシューマグループ ID は​abc-dashboard​です。メッセージを受信すると、バスからの詳細な位置情報を即座に表示します。通常はこの詳細な位置情報を設定しますが、簡潔にするためにコード内にハードコードされています。

 パブリック静的void main文字列[] 引数){
文字列ブローカー= "127 .0.0 .1 : 9092 " ;
文字列groupId = "abc - ダッシュボード" ;
プロパティprops = new Properties ();
props.setProperty ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIGブローカー);
props.setProperty ( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
StringDeserializer.class.getName ( ) );
props.setProperty ( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
StringDeserializer.class.getName ( ) );
props.setProperty ( ConsumerConfig . GROUP_ID_CONFIG , groupId );
@SuppressWarnings ( "リソース" )
Consumer < 文字列文字列> consumer = new KafkaConsumer < 文字列文字列> ( props );
コンシューマー.subscribe ( Arrays.asList ( "abc - bus - location" ));
while ( true ) {
ConsumerRecords < 文字列, 文字列> レコード
= consumer.poll ( Duration.ofMillis ( 1000 ) );
ConsumerRecord < 文字列文字列> レコード: レコード) {
文字列topic = record.topic () ;
int パーティション= レコード. パーティション();
文字列キー= record.key () ;
文字列= レコード. ();
システム.out.println ( 文字列.format (
「トピック= % sパーティション= % dキー= % s= % s」
トピックパーティションキー));
}
}
}
頼る

このコードをコンパイルして実行するには、JDK 8以降が必要です。以下の​pom.xml​ファイル内のMaven依存関係を確認してください。必要なKafkaクライアントライブラリがダウンロードされ、クラスパスに追加されます。

 <依存関係>
<グループID> org.apache.kafka </グループID>
<artifactId> kafka - クライアント</artifactId>
<バージョン> 2.8.0 </バージョン>
</依存関係>
<依存関係>
<グループID> org.slf4j </グループID>
<artifactId> slf4j - シンプル</artifactId>
<バージョン> 1.7.25 </バージョン>
</依存関係>

展開する

​abc-bus-location​トピックは作成時に3つのパーティションを指定するため、位置情報の更新の読み取りプロセスを高速化するために、3つのコンシューマーを実行する必要があります。そのためには、3つの異なるターミナルでダッシュボードを同時に実行する必要があります。3つのダッシュボードはすべて同じグループIDで登録されているため、当然ながらコンシューマーグループを形成します。Kafkaは各ダッシュボードに特定のパーティション(コンシューム用)を割り当てます。

すべてのダッシュボードインスタンスが実行中になったら、別のターミナルで​Fleet​クラスを起動します。図6、7、8は、ダッシュボードターミナルからのコンソール出力のサンプルです。

図6: 計器盤の端子の1つ

コンソールメッセージを詳しく見ると、1番目、2番目、3番目の端末のコンシューマーがそれぞれ​partition-2​​partition-1​​partition-0​からメッセージを読み取っていることがわかります。さらに、キー​BLRHBL002​​BLRHBL004​​BLRHBL006​を持つメッセージが​partition-2​に書き込まれ、キー​BLRHBL005​を持つメッセージが​partition-1​に書き込まれ、残りのメッセージが​partition-0​に書き込まれていることも確認できます。

図7:計器盤端子(その2)

Kafka を使用する利点は、クラスターが適切に設計されていれば、水平方向に拡張して多数のバスと数百万のメッセージをサポートできることです。

図8:計器盤端子(その3)

ニュースだけではない

Kafka ウェブサイトのデータによると、Fortune 100 企業の 80% 以上が Kafka を使用しています。

Kafka。金融サービスやエンターテインメントなど、多くの垂直産業で導入されています。Kafkaは…

当初はシンプルなメッセージングサービスだったConfluentは、業界最高クラスのストリーム処理機能により、ビッグデータエコシステムの一部へと進化しました。マネージドソリューションを希望する企業にとって、Confluentは...

クラウドベースの Kafka サービスを提供し、サブスクリプション料金のみが必要です。(LCTT 翻訳者注: Confluent は Kafka をベースにした商用企業であり、... を提供しています)

Confluent Kafka は Apache Kafka をベースに構築され、多くのエンタープライズ グレードの機能を追加しており、「より完全な Kafka」と考えられています。