Kafka をゼロから使い始めるのに役立つ非常に詳細な説明です。

Kafka をゼロから使い始めるのに役立つ非常に詳細な説明です。

Kafkaを理解する

Kafka の紹介

Kafka は分散ストリーミング プラットフォームです。 Kafka 公式サイト: http://kafka.apache.org/

1) ストリーミング プラットフォームには、次の 3 つの主要な機能があります。

  • メッセージ キューやエンタープライズ メッセージング システムと同様に、レコードのストリームを公開およびサブスクライブします。
  • フォールト トレラントで永続的な方法でレコードのストリームを「保存」します。
  • ログ記録が発生したらストリームを処理します。

2) Kafka は通常、次の 2 つのカテゴリのアプリケーションで使用されます。

  • システムやアプリケーション間でデータを確実に取得するリアルタイムストリーミングデータパイプラインを構築する
  • データストリームを変換または反応するリアルタイムストリーミングアプリケーションを構築する

3) まず、いくつかの概念:

  • Kafka は、複数のデータセンターにまたがる 1 つ以上のサーバー上でクラスターとして実行されます。
  • Kafka クラスターは、トピックと呼ばれるカテゴリにレコードのストリームを保存します。
  • 各レコードは**「キー、値、タイムスタンプ」** で構成されます。

4) Kafka には 4 つのコア API があります。

  • Producer API を使用すると、アプリケーションはレコードのストリームを 1 つ以上の Kafka トピックに公開できます。
  • Consumer API を使用すると、アプリケーションは 1 つ以上のトピックをサブスクライブし、それらによって生成されたレコードのストリームを処理できます。
  • Streams API を使用すると、アプリケーションはストリーム プロセッサとして機能し、1 つ以上のトピックからの入力ストリームを消費し、1 つ以上の出力トピックへの出力ストリームを生成して、入力ストリームを出力ストリームに効率的に変換できます。
  • コネクタ API を使用すると、既存のアプリケーションまたはデータ システムに接続してプロデューサーまたはコンシューマーを再利用できる Kafka トピックを構築および実行できます。たとえば、リレーショナル データベースへのコネクタは、テーブルへのすべての変更をキャプチャする場合があります。

写真

Kafka では、クライアントとサーバー間の通信は、シンプルで高性能、言語に依存しない TCP プロトコルを介して行われます。このプロトコルはバージョン管理されており、古いバージョンとの下位互換性が維持されています。 Kafka は Java クライアントを提供しますが、クライアントは多くの言語で利用できます。

1.2 トピックとパーティション

まず、Kafka がレコードのストリームに対して提供するコア抽象化であるトピックを詳しく見ていきます。

トピックはメッセージの一種と考えることができます。各トピックは複数のパーティションに分割されます。各パーティションは、ストレージ レベルでの追加ログ ファイルです。

トピックは、レコードが公開されるカテゴリまたはフィード名です。 Kafka トピックは常にマルチサブスクライバーです。つまり、トピックには、書き込まれたデータをサブスクライブする 0 個、1 個、または多数のコンシューマーが存在する可能性があります。

各トピックについて、Kafka クラスターは次のようなパーティション化されたログを維持します。

写真

各パーティションは、構造化されたコミット ログに継続的に追加される、順序付けられた不変のレコードのシーケンスです。パーティション内の各レコードには、オフセットと呼ばれる連続した ID 番号が割り当てられ、パーティション内の各レコードを一意に識別します。

Kafka クラスターは、消費されたかどうかに関係なく、設定可能な保持期間を使用して、公開されたすべてのレコードを保持します。たとえば、保持ポリシーが 2 日に設定されている場合、レコードは公開後 2 日間使用可能になり、その後はスペースを解放するために破棄されます。 Kafka のパフォーマンスはデータ サイズに関して実質的に一定であるため、長期間にわたってデータを保存しても問題はありません。

写真

実際、コンシューマーごとに保持されるメタデータは、そのコンシューマーのオフセットまたはログ内の位置のみです。このオフセットはコンシューマーによって制御されます。通常、コンシューマーはレコードを読み取るときにオフセットを直線的に進めますが、実際には、位置はコンシューマーによって制御されるため、任意の順序でレコードを消費できます。たとえば、コンシューマーは古いオフセットにリセットして過去のデータを再処理したり、最新のレコードにジャンプして「今」から消費を開始したりできます。

この機能の組み合わせにより、Kafka コンシューマーは非常に安価になり、クラスターや他のコンシューマーに大きな影響を与えることなく、出入りできるようになります。たとえば、コマンドライン ツールを使用して、既存のコンシューマーが消費する内容を変更せずに、任意のトピックのコンテンツを「テール」することができます。

ログ内のパーティションにはいくつかの目的があります。まず、ログを単一のサーバーに収まるサイズを超えて拡張できるようになります。個々のパーティションは、それをホストするサーバーに収まる必要がありますが、トピックには任意の量のデータを処理できるように、複数のパーティションが存在する場合があります。 2 番目に、それらは並列処理の単位として機能します。これについては後で詳しく説明します。

1.3 配布

トピックの複数のパーティションは、Kafka クラスター内の複数のサーバーに分散されます。各サーバー (Kafka インスタンス) は、パーティション内のメッセージの読み取りと書き込みを担当します。さらに、Kafka ではバックアップするパーティション (レプリカ) の数も設定でき、各パーティションは複数のマシンにバックアップされるため、可用性が向上します。

複製されたソリューションに基づくと、複数のバックアップをスケジュールする必要があることを意味します。各パーティションには「リーダー」となるサーバーが存在します。リーダーはすべての読み取りおよび書き込み操作を担当します。リーダーが失敗した場合、他のフォロワーが引き継ぎます(新しいリーダーになります)。フォロワーはリーダーに単調に従い、メッセージを同期するだけです...リーダーとしてのサーバーがすべての要求圧力を負担していることがわかります。したがって、クラスター全体を考慮すると、パーティションの数は「リーダー」の数を意味します。 Kafka は、全体的な安定したパフォーマンスを確保するために、各インスタンスに「リーダー」を均等に分散します。

1.4 生産者と消費者

1.4.1 プロデューサー

プロデューサーは指定されたトピックにデータを公開します。同時に、プロデューサーはメッセージがどのパーティションに属するかを決定することもできます。たとえば、「ラウンドロビン」方式やその他のアルゴリズムに基づいて行われます。

1.4.2 消費者

  • 基本的に、Kafka はトピックのみをサポートします。各消費者は消費者グループに属します。逆に、各グループには複数のコンシューマーが存在する場合もあります。トピックに送信されたメッセージは、このトピックをサブスクライブする各グループ内の 1 つのコンシューマーによってのみ消費されます。
  • すべてのコンシューマー インスタンスが同じコンシューマー グループを持つ場合、レコードはコンシューマー インスタンス間で効果的に負荷分散されます。
  • すべてのコンシューマー インスタンスに異なるコンシューマー グループがある場合、各レコードはすべてのコンシューマー プロセスにブロードキャストされます。

写真

分析: 2 つのサーバー Kafka クラスター。4 つのパーティション (P0 ~ P3) をホストし、2 つのコンシューマー グループが含まれています。コンシューマー グループ A には 2 つのコンシューマー インスタンスがあり、グループ B には 4 つのコンシューマー インスタンスがあります。

Kafka で消費が実装される方法は、ログ内のパーティションをコンシューマー インスタンス間で分割し、各インスタンスが任意の時点で割り当ての「公平なシェア」の排他的コンシューマーになるようにすることです。グループのメンバーシップを維持するプロセスは、Kafka プロトコルによって動的に処理されます。新しいインスタンスがグループに参加すると、グループの他のメンバーから一部のパーティションを引き継ぎます。インスタンスが停止した場合、そのパーティションは残りのインスタンスに分散されます。

Kafka は、トピック内の異なるパーティションのレコード間の集約順序ではなく、パーティション内のレコードの集約順序のみを提供します。ほとんどのアプリケーションでは、パーティションによるソートとキーによるデータのパーティション分割機能を組み合わせるだけで十分です。ただし、レコードの完全な順序付けが必要な場合は、パーティションが 1 つだけのトピックを使用してこれを実現できますが、この場合、コンシューマー グループごとにコンシューマー プロセスが 1 つだけになります。

1.5 Kafkaが保証する消費者

  • パーティションに送信されたメッセージは、受信順にログに追加されます。つまり、レコード M1 がレコード M2 と同じプロデューサーによって送信され、M1 が最初に送信された場合、M1 のオフセットは M2 よりも低くなり、ログ内では前に表示されます。
  • コンシューマー インスタンスは、ログに保存されている順序でレコードを表示します。コンシューマーの場合、メッセージを消費する順序は、ログ内のメッセージの順序と一致します。
  • トピックの「レプリケーション係数」が N の場合、N-1 個の Kafka インスタンスが失敗することが許可され、ログに送信されたレコードを失うことなく、最大 N-1 個のサーバー障害を許容します。

1.6 メッセージングシステムとしての Kafka

Kafka のストリームの概念は、従来のエンタープライズ メッセージング システムとどのように異なりますか?

(1)従来のメッセージングシステム

従来、メッセージング モデルにはキューイングとパブリッシュ/サブスクライブの 2 つがあります。キューでは、コンシューマーのプールがサーバーから読み取り、各レコードがいずれかのコンシューマーに送信されます。パブリッシュ/サブスクライブでは、レコードはすべてのコンシューマーにブロードキャストされます。これら 2 つのモデルにはそれぞれ長所と短所があります。キューイングの利点は、データ処理を複数のコンシューマー インスタンスに分割できるため、処理を拡張できることです。残念ながら、キューはマルチサブスクライバーではないため、プロセスがデータを読み取るとすぐに消えてしまいます。パブリッシュ/サブスクライブでは、複数のプロセスにデータをブロードキャストできますが、すべてのメッセージがすべてのサブスクライバーに送信されるため、スケーラブルな処理はできません。

Kafka の消費者グループの概念は、これら 2 つの概念を一般化します。キューと同様に、コンシューマ グループを使用すると、一連のプロセス (コンシューマ グループのメンバー) 間で処理を分割できます。パブリッシュ/サブスクライブと同様に、Kafka では複数のコンシューマー グループにメッセージをブロードキャストできます。

(2)カフカの利点

Kafka モデルの利点は、すべてのトピックがこれらの両方の特性 (処理をスケーリングでき、マルチサブスクライバーでもある) を備えているため、どちらか一方を選択する必要がないことです。

Kafka は従来のメッセージング システムよりも強力な順序保証を備えています。

従来のキューはサーバー上でレコードを順番に保持し、複数のコンシューマーがキューから消費する場合、サーバーはレコードを格納された順序で配布します。ただし、サーバーはレコードを順番にディスパッチしますが、レコードはコンシューマーに非同期的に配信されるため、異なるコンシューマーでは順序どおりに表示されない場合があります。これは実質的に、並列消費がある場合にレコードの順序が失われることを意味します。メッセージング システムは通常、1 つのプロセスのみがキューから消費できるようにする「排他的コンシューマー」の概念によってこの問題を解決しますが、もちろん、これは処理に並列性がないことを意味します。

Kafka の方が優れています。トピック内に並列処理の概念 (パーティション) を持たせることで、Kafka はコンシューマー プロセスのプール全体にわたって順序の保証と負荷分散を提供できます。これは、トピック内のパーティションをコンシューマー グループ内のコンシューマーに割り当て、各パーティションがグループ内の 1 つのコンシューマーによってのみ消費されるようにすることで実現されます。こうすることで、コンシューマーがそのパーティションの唯一のリーダーとなり、データを順番に使用することが保証されます。パーティションが多数あるため、多くのコンシューマー インスタンス間で負荷を分散できます。ただし、コンシューマー グループ内のパーティション数よりも多くのコンシューマー インスタンスは存在できないことに注意してください。

1.7 ストレージシステムとしての Kafka

  • メッセージの消費とは別にメッセージの公開を許可するメッセージ キューは、実際には送信中のメッセージのストレージ システムとして機能します。 Kafka が他と異なる点は、非常に優れたストレージ システムであることです。
  • Kafka に書き込まれたデータはディスクに書き込まれ、フォールト トレランスのために複製されます。 Kafka では、プロデューサーが確認応答を待機できるため、書き込みは完全に複製されるまで完了とは見なされず、書き込み先のサーバーに障害が発生しても書き込みが存続することが保証されます。
  • Kafka のディスク上の構造はスケールを非常にうまく利用しており、サーバー上に 50 KB の永続データがあっても 50 TB の永続データがあっても、Kafka は同じように動作します。
  • Kafka はストレージを重視し、クライアントが読み取り位置を制御できるようにするため、高パフォーマンスで低レイテンシのコミット ログのストレージ、レプリケーション、および伝播用に設計された特殊な分散ファイル システムと考えることができます。

1.8 ストリーム処理のための Kafka

  • データ ストリームを読み取り、書き込み、保存するだけでは不十分です。目標は、ストリームのリアルタイム処理を実現することです。
  • Kafka では、ストリーム プロセッサとは、入力トピックから連続的なデータ ストリームを取得し、この入力に対して何らかの処理を実行し、出力トピックに連続的なデータ ストリームを生成するものを指します。
  • たとえば、小売アプリケーションは、売上と出荷の入力ストリームを受け取り、このデータに基づいて計算された再注文と価格調整のストリームを出力する場合があります。
  • プロデューサー API とコンシューマー API を使用して、簡単な処理を直接実行できます。ただし、より複雑な変換のために、Kafka は完全に統合された Streams API を提供します。これにより、ストリームの集計を計算したり、ストリームを結合したりするなど、重要な処理を実行するアプリケーションを構築できます。
  • このツールは、順序どおりでないデータの処理、コード変更時の入力の再処理、ステートフルな計算の実行など、このようなアプリケーションが直面する課題に対処するのに役立ちます。
  • Streams API は、Kafka によって提供されるコア プリミティブに基づいて構築されています。入力には Producer API と Consumer API を使用し、ステートフル ストレージには Kafka を使用し、フォールト トレランスにはストリーム プロセッサ インスタンス間で同じグループ メカニズムを使用します。

2. Kafkaの使用シナリオ

2.1 メッセージング

Kafka は、従来のメッセージ ブローカーの代替として使用できます。メッセージ ブローカーは、さまざまな理由 (データ プロデューサーからの処理の分離、未処理のメッセージのバッファリングなど) で使用されます。 Kafka は、ほとんどのメッセージング システムよりも優れたスループット、組み込みのパーティショニング、レプリケーション、フォールト トレランスを備えているため、大規模なメッセージ処理アプリケーションに最適なソリューションです。

経験則として、メッセージングの使用は通常は比較的低いですが、エンドツーエンドのレイテンシを低く抑える必要があり、Kafka が提供する強力な耐久性の保証に依存することがよくあります。

この分野では、Kafka は ActiveMQ や RabbitMQ などの従来のメッセージング システムに匹敵します。

2.2 ウェブサイトアクティビティの追跡

Kafka の元々の使用例は、ユーザー アクティビティ追跡パイプラインをリアルタイムのパブリッシュ/サブスクライブ フィードのセットとして再構築できるようにすることでした。つまり、サイト アクティビティ (ページ ビュー、検索、またはユーザーが実行するその他のアクション) は、アクティビティ タイプごとに 1 つのトピックで中央トピックに公開されます。これらのフィードは、リアルタイム処理、リアルタイム監視、オフライン処理およびレポート作成のための Hadoop またはオフライン データ ウェアハウス システムへのロードなど、さまざまなユース ケースの注文に使用できます。

ユーザーのページビューごとに多くのアクティビティ メッセージが生成されるため、アクティビティ トラッキングは通常非常に高くなります。

2.3 メトリクス

Kafka は運用監視データによく使用されます。これには、分散アプリケーションからの統計を集約して、運用データの集中フィードを生成することが含まれます。

2.4 ログの集約

多くの人がログ集約ソリューションの代わりとして Kafka を使用しています。ログ集約では通常、サーバーから物理ログ ファイルが収集され、処理のために中央の場所 (ファイル サーバーまたは HDFS など) に配置されます。 Kafka はファイルの詳細を抽象化し、ログまたはイベント データをメッセージのストリームとしてよりきれいに表示します。これにより、処理のレイテンシが低減され、複数のデータ ソースと分散データ消費のサポートが容易になります。 Scribe や Flume などのログ中心のシステムと比較して、Kafka は、レプリケーションによる強力な耐久性保証と、エンドツーエンドのレイテンシの低減により、同様に優れたパフォーマンスを提供します。

2.5 ストリーム処理

多くの Kafka ユーザーは、複数のステージで構成されるパイプラインとしてデータを処理します。パイプラインでは、生の入力データが Kafka トピックから消費され、その後、さらなる消費または後続の処理のために、集約、拡充、またはその他の方法で新しいトピックに変換されます。

たとえば、ニュース記事を推奨するための処理パイプラインは、RSS フィードから記事のコンテンツをスクレイピングし、「記事」トピックに公開する場合があります。さらに処理を進めると、このコンテンツが正規化または重複排除され、クリーンアップされた記事コンテンツが新しいトピックに公開される可能性があります。最終処理段階では、このコンテンツをユーザーに推奨しようとする可能性があります。このような処理パイプラインは、さまざまなトピックに基づいてリアルタイム データ ストリームのグラフを作成します。 0.10.0.0 以降では、Apache Kafka で Kafka Streams という軽量でありながら強力なストリーム処理ライブラリを使用して、上記のようなデータ処理を実行できます。 Kafka Streams に加えて、他のオープンソース ストリーム処理ツールには Apache Storm や Apache Samza などがあります。

2.6 イベントソーシング

イベント ソーシングは、状態の変化が時間順のレコードのシーケンスとして記録されるアプリケーション設計のスタイルです。 Kafka は非常に大きなログ データの保存をサポートしているため、このスタイルで構築されたアプリケーションにとって優れたバックエンドになります。

2.7 ログの送信

Kafka は分散システムの外部コミット ログとして使用できます。ログはノード間でデータを複製するのに役立ち、障害が発生したノードがデータを回復するための再同期メカニズムとして機能します。 Kafka のログ圧縮機能は、この使用法をサポートするのに役立ちます。この使用法では、Kafka は Apache BookKeeper プロジェクトに似ています。

3. Kafkaのインストール

3.1 ダウンロードとインストール

公式 Web サイト http://kafka.apache.org/downloads.html にアクセスして、必要なバージョンをダウンロードします。

注: Kafka コンソール スクリプトは Unix ベースと Windows ベースのプラットフォームで異なるため、Windows プラットフォームでは bin/ ではなく bin\windows\ を使用し、スクリプト拡張子を .bat に変更します。

  1. [root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz  
  2. [root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz  
  3. [root@along ~]# cd /data/kafka_2.11-2.1.0/

3.2 Zookeeperの設定と起動

kafka を正常に実行するには、zookeeper を構成する必要があります。そうしないと、kafka クラスターもクライアントの Survivor と Consumer も正常に動作しません。そのため、Zookeeper サービスを設定して起動する必要があります。

(1)ZookeeperにはJava環境が必要

  1. [root@along ~]# yum -y install java-1.8.0

(2)ここでのKafkaダウンロードパッケージにはすでにZookeeperサービスが含まれているため、設定ファイルを変更して起動するだけで済みます。

Zookeeper の特定のバージョンをダウンロードする必要がある場合は、Zookeeper の公式 Web サイト http://mirrors.shu.edu.cn/apache/zookeeper/ にアクセスして、指定されたバージョンをダウンロードできます。

  1. [root@along ~]# cd /data/kafka_2.11-2.1.0/  
  2. [root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties  
  3. dataDir =/tmp/zookeeper #データ保存ディレクトリ 
  4. clientPort = 2181 #zookeeper ポート 
  5. 最大クライアント接続数= 0    

注: Zookeeperの設定は自分で追加したり変更したりできます

3.3 Kafka の設定

(1)設定ファイルを変更する

  1. [root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties  
  2. ブローカーID = 0      
  3. リスナー=プレーンテキスト://localhost:9092  
  4. ネットワークスレッド数= 3      
  5. スレッド数= 8      
  6. ソケット送信バッファバイト= 102400      
  7. ソケット受信バッファバイト= 102400     
  8. ソケットリクエスト最大バイト数= 104857600      
  9. log.dirs = /tmp/kafka-logs  
  10. パーティション数= 1      
  11. データディレクトリあたりの回復スレッド数= 1    
  12. オフセット.トピック.レプリケーション.係数= 1      
  13. トランザクション状態ログレプリケーション係数= 1      
  14. トランザクション状態ログ最小ISR = 1      
  15. ログ保持時間= 168      
  16. ログセグメントバイト= 1073741824      
  17. log.retention.check.interval.ms = 300000      
  18. zookeeper.connect =ローカルホスト:2181  
  19. Zookeeper.connection.timeout.ms = 6000      
  20. グループ.初期リバランス.遅延.ms = 0    

注: 必要に応じて設定ファイルを変更できます。

  1. broker.id: #一意のID  
  2. listeners = PLAINTEXT ://localhost:9092: #kafka サービスのリスニング アドレスとポート 
  3. log.dirs: #ログ保存ディレクトリ
  4. zookeeper.connect: #zookeeper サービスを指定する

(2)環境変数を設定する

  1. [root@along ~]# vim /etc/profile.d/kafka.sh  
  2. エクスポートKAFKA_HOME = "/data/kafka_2.11-2.1.0"      
  3. PATHをエクスポート= "${KAFKA_HOME}/bin:$PATH"      
  4. [root@along ~]# ソース /etc/profile.d/kafka.sh

(3)サービス起動スクリプトを設定する

  1. [root@along ~]# vim /etc/init.d/kafka  
  2. #!/bin/sh  
  3. #  
  4. # chkconfig: 345 99 01  
  5. # 説明: カフカ 
  6. #  
  7. # ファイル: Kafka  
  8. #  
  9. # 説明: Kafka サーバーの起動と停止 
  10. #  
  11. ソース /etc/rc.d/init.d/functions
  12. KAFKA_HOME =/data/kafka_2.11-2.1.0  
  13. KAFKA_USER =ルート     
  14. LOG_DIR =/tmp/kafka-logsをエクスポートします 
  15. [ -e /etc/sysconfig/kafka ] && 。カフカ
  16.   # 私たちがどのように呼ばれたか見てみましょう。  
  17. ケース「$1」
  18.   始める)  
  19. echo -n "Kafka を起動しています:"  
  20. /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2 > $LOG_DIR/server.err &"
  21. echo "完了しました。"  
  22. 終了 0  
  23. ;;   
  24.   停止)
  25.   echo -n "Kafka を停止しています: "  
  26. /sbin/runuser -s /bin/sh $KAFKA_USER -c "p​​s -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"
  27.   echo "完了しました。"  
  28. 終了 0  
  29. ;;  
  30. ハードストップ 
  31. echo -n "Kafka を停止しています (ハード): "  
  32. /sbin/runuser -s /bin/sh $KAFKA_USER -c "p​​s -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
  33.   echo "完了しました。"
  34. 終了 0
  35.   ;;   
  36.   状態)
  37.   c_pid =`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
  38. [ "$c_pid" = "" ] の場合;それから 
  39. 「停止しました」とエコー 
  40. 出口3  
  41. それ以外 
  42. echo "$c_pid を実行中"  
  43. 終了 0  
  44. フィ 
  45. ;;   
  46.   再起動)  
  47. 停止 
  48. 始める 
  49. ;;  
  50.   *)  
  51. echo "使用法: kafka {start|stop|hardstop|status|restart}"
  52.   出口1  
  53. ;;  
  54.  エサック

3.4 Kafkaサービスを開始する

(1)バックグラウンドでZookeeperサービスを開始する

  1. [root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &

(2)Kafkaサービスを開始する

  1. [root@along ~]# サービスkafka開始 
  2. kafka を起動しています (systemctl 経由): [ OK ]  
  3. [root@along ~]# サービス kafka ステータス 
  4. 86018を実行中 
  5. [root@along ~]# ss -nutl  
  6. Netid 状態 Recv-Q Send-Q ローカルアドレス:ポート ピアアドレス:ポート
  7. TCP LISTEN 0 50 :::9092 :::*
  8. TCP LISTEN 0 50 :::2181 :::*

4. Kafka を使い始める

4.1 トピックを作成する

パーティションが 1 つだけ含まれ、レプリカが 1 つだけある「along」というトピックを作成します。

  1. [root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along  
  2. トピック「along」を作成しました。

list topic コマンドを実行すると、トピックが表示されます。

  1. [root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181  
  2. 平行

4.2 メッセージを送信する

Kafka には、ファイルまたは標準入力から入力を受け取り、それをメッセージとして Kafka クラスターに送信するコマンドライン クライアントが付属しています。デフォルトでは、各行は個別のメッセージとして送信されます。

プロデューサーを実行し、コンソールにメッセージを入力してサーバーに送信します。

  1. [root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along  
  2. >これはメッセージです 
  3. >これは別のメッセージです

4.3 コンシューマーを起動する

Kafka には、メッセージを標準出力にダンプするコマンド ライン コンシューマーもあります。

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning  
  2. これはメッセージです 
  3. これは別のメッセージです

すべてのコマンドライン ツールには追加のオプションがあります。引数なしでコマンドを実行すると、引数の使用方法をより詳細に記録した情報が表示されます。

5. マルチブローカー Kafka クラスターの設定

これまでは単一のブローカーで実行してきましたが、それでは面白くありません。 Kafka の場合、単一のブローカーはサイズ 1 のクラスターにすぎないため、いくつかのブローカー インスタンスを起動する以外に大きな変更はありません。しかし、感覚をつかむために、クラスターを 3 つのノード (引き続きローカル マシン上) に拡張してみましょう。

5.1 設定ファイルの準備

  1. [root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/  
  2. [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties  
  3. [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties  
  4. [root@along kafka_2.11-2.1.0]# vim config/server-1.properties  
  5. ブローカーID = 1      
  6. リスナー=プレーンテキスト://:9093  
  7. ログディレクトリ= /tmp/kafka-logs-1  
  8. [root@along kafka_2.11-2.1.0]# vim config/server-2.properties  
  9. ブローカーID = 2      
  10. リスナー=プレーンテキスト://:9094  
  11. ログディレクトリ= /tmp/kafka-logs-2

注: broker.id プロパティは、クラスター内の各ノードの一意かつ永続的な名前です。これらを同じマシン上で実行しており、すべてのエージェントが同じポートに登録するか、互いのデータを上書きするようにしたいため、ポートとログ ディレクトリをオーバーライドする必要があります。

5.2 クラスター内でさらに2つのKafkaサービスを有効にする

  1. [root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &  
  2. [root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &  
  3. [root@along ~]# ss -nutl  
  4. Netid 状態 Recv-Q Send-Q ローカルアドレス:ポート ピアアドレス:ポート
  5. TCP リッスン 0 50 ::ffff:127.0.0.1:9092 :::*
  6. TCP リッスン 0 50 ::ffff:127.0.0.1:9093 :::*
  7. TCP リッスン 0 50 ::ffff:127.0.0.1:9094 :::*

5.3 クラスタでの操作

1) レプリケーション係数が3の新しいトピックmy-replicated-topicを作成します。

  1. [root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic  
  2. トピック「my-replicated-topic」を作成しました。

2) クラスターで「describe topics」コマンドを実行して、どのブローカーが何を実行しているかを確認します。

  1. [root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic 私のレプリケートされたトピック 
  2. トピック:my-replicated-topic パーティション数:1 レプリ​​ケーション係数:3 構成:  
  3. トピック: my-replicated-topic パーティション: 0 リーダー: 2 レプリカ: 2,0,1 Isr: 2,0,1  
  4. #コメント: 最初の行にはすべてのパーティションの概要が示され、追加の各行には 1 つのパーティションに関する情報が示されます。このトピックにはパーティションが 1 つしかないため、行は 1 つだけです。  
  5. #「リーダー」とは、特定のパーティションのすべての読み取りと書き込みを担当するノードです。各ノードは、パーティションのランダムに選択された部分のリーダーになります。  
  6. # 「レプリカ」は、リーダーであるかどうか、または現在アクティブであるかどうかに関係なく、このパーティションのログを複製するノードのリストです。
  7. # 「isr」は「同期された」レプリカのセットです。これは、現在アクティブであり、リーダーによってキャッチアップされたレプリカ リストのサブセットです。
  8. #リーダー: 2 であることに注意してください。私の例では、ノード 2 はトピックの唯一のパーティションのリーダーです。

3) 作成した元のトピックで同じコマンドを実行して、それがどこにあるかを確認できます。

  1. [root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along  
  2. トピック:along パーティション数:1 レプリ​​ケーション係数:1 構成:  
  3. トピック: along パーティション: 0 リーダー: 0 レプリカ: 0 Isr: 0

4) 新しいトピックにいくつかのメッセージを公開します。

  1. [root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic  
  2. >私のテストメッセージ 1  
  3. >私のテストメッセージ2  
  4. > ^C

5) 次に、次のメッセージを使用しましょう。

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic  
  2. 私のテストメッセージ1  
  3. 私のテストメッセージ2

5.4 クラスタのフォールトトレランスのテスト

1) 次にフォールトトレランスをテストします。ブローカー 2 がリーダーとして機能しているので、これを強制終了します。

  1. [root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'  
  2. 106737  
  3. [root@along ~]# キル -9 106737  
  4. [root@along ~]# ss -nutl  
  5. TCP リッスン 0 50 ::ffff:127.0.0.1:9092 :::*
  6. TCP リッスン 0 50 ::ffff:127.0.0.1:9093 :::*

2) リーダーがスレーブ ノードの 1 つに切り替え、ノード 2 は同期されたレプリカ セットに含まれなくなりました。

  1. [root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic 私のレプリケートされたトピック 
  2. トピック:my-replicated-topic パーティション数:1 レプリ​​ケーション係数:3 構成:  
  3. トピック: my-replicated-topic パーティション: 0 リーダー: 0 レプリカ: 2,0,1 Isr: 0,1

3) 書き込みを最初に受け入れたリーダーが失敗した場合でも、これらのメッセージは引き続き使用できます。

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic  
  2. 私のテストメッセージ 1  
  3. 私のテストメッセージ2

6. Kafka Connect を使用したデータのインポート/エクスポート

コンソールからデータを書き込んでコンソールに書き戻すのは便利な出発点ですが、他のソースのデータを使用したり、Kafka から他のシステムにデータをエクスポートしたりする必要がある場合もあります。多くのシステムでは、カスタム統合コードを記述する代わりに、Kafka Connect を使用してデータをインポートまたはエクスポートできます。

Kafka Connect は、Kafka にデータをインポートおよびエクスポートするために使用される、Kafka に含まれるツールです。これは、外部システムと対話するためのカスタム ロジックを実装するコネクタを実行する拡張可能なツールです。このクイックスタートでは、ファイルから Kafka トピックにデータをインポートし、Kafka トピックからファイルにデータをエクスポートするシンプルなコネクタを使用して Kafka Connect を実行する方法について説明します。

1) まず、テスト用のシードデータを作成します。

  1. [root@along ~]# echo -e "foo\nbar" > test.txt  
  2. または Windows の場合:  
  3. >エコーfoo > test.txt  
  4. >エコーバー> > test.txt

2) 次に、スタンドアロン モードで実行される 2 つのコネクタを起動します。つまり、それらは単一のローカル専用プロセスで実行されます。パラメータとして 3 つの構成ファイルが提供されます。

1 つ目は常に Kafka Connect プロセスの構成であり、接続する Kafka ブローカーやデータのシリアル化形式などの共通構成が含まれます。

残りの構成ファイルでは、作成するコネクタを指定します。これらのファイルには、一意のコネクタ名、インスタンス化するコネクタ クラス、およびコネクタに必要なその他の構成が含まれます。

  1. [root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties  
  2. [2019-01-16 16:16:31,884] INFO Kafka Connect スタンドアロン ワーカーが初期化しています... (org.apache.kafka.connect.cli.ConnectStandalone:67)  
  3. [2019-01-16 16:16:31,903] INFO WorkerInfo の値:  
  4. ……  
  5. #注: Kafka に付属するこれらのサンプル構成ファイルは、以前に開始したデフォルトのローカル クラスター構成を使用し、2 つのコネクタを作成します。1 つ目は、入力ファイルから行を読み取って各 Kafka トピックを生成するソース コネクタで、2 つ目は、Kafka トピックからメッセージを読み取り、各メッセージを出力ファイルの行として生成するシンク コネクタです。

3) インポートが成功したかどうかを確認する(別のターミナルを起動する)

起動プロセス中に、コネクタがインスタンス化されていることを示すメッセージを含む、いくつかのログ メッセージが表示されます。

① Kafka Connect プロセスが開始すると、ソース コネクタは test.txt トピックから行を読み取り、トピック connect-test に生成し、シンク コネクタはトピック connect-test からメッセージを読み取り、ファイル test.sink.txt に書き込み始めます。出力ファイルの内容を調べることで、データがパイプライン全体を通過したことを確認できます。

  1. [root@along ~]# cat test.sink.txt  
  2. フー 
  3. バー

② データは Kafka トピック connect-test に保存されるので、コンソール コンシューマーを実行してトピック内のデータを表示することもできます (またはカスタム コンシューマー コードで処理することもできます)。

  1. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning  
  2. {"スキーマ":{"タイプ":"文字列","オプション":false},"ペイロード":"foo"}  
  3. {"schema":{"type":"string","optional":false},"payload":"bar"}

4) データを追加して検証を続ける

  1. [root@along ~]# echo 別の行> > test.txt  
  2. [root@along ~]# cat test.sink.txt  
  3. フー 
  4. バー 
  5. 別の行 
  6. [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning  
  7. {"スキーマ":{"タイプ":"文字列","オプション":false},"ペイロード":"foo"}  
  8. {"schema":{"type":"string","optional":false},"payload":"bar"}  
  9. {"schema":{"type":"string","optional":false},"payload":"別の行"}

<<:  AIを活用したインテリジェントオートメーションは、人間が「スーパーパワー」を獲得するのを助ける

>>:  今回は、一見神秘的なKubernetesを理解するお手伝いをします!

推薦する

天猫と淘宝網の「双十一」取引額は191億に達し、前年比260%増

アリババグループは11月12日早朝、ダブル11プロモーション期間中のアリペイでの総売上高が191億円...

文化博覧会は16年ぶりにオンラインで開催され、テンセントが独占技術サービスプロバイダーとなった。

9月15日、深セン国際文化産業博覧会貿易フェア株式会社と深センテンセントコンピュータシステム株式会社...

Virpus-25% オフ/12.7 USD/年/Xen/15g SSD/2 コア/1.5T トラフィック/シアトル

virpus.com の旋風割引プロモーションが始まりました。SSD ハード ドライブを搭載したすべ...

質の高い友好的なリンク交換スキルについてどれだけ知っていますか

私は長年 SEO 業務に携わっており、検索エンジン最適化についてある程度理解しています。私は主に機械...

edgevm (egihosting) - 512m メモリ KVM/Windows/年間 50 ドル

edgevm.com は、サンノゼとニューヨークの 2 つのデータ センターから特別なプロモーション...

Hawkhost-VPS 8月/ダラスの60%オフプロモーション

Hawkhost は、OpenVZ ベース、バースト メモリ搭載、データ センターはダラスの VPS...

レビュー:「Qvod モデル」の何がすごいのか? 最もグレーなキラーアプリ

現状を受けて、Kuaibo は大胆な自己革命を実行しました。 4月16日夜、Qvodは新浪微博を通じ...

解決しなければならないローカルウェブサイトに存在する問題についての簡単な議論

最近はフレンドリーリンクの作業を手伝ったり、もちろんローカルサイトの状況も見たりしています。交換以外...

中国で初めてドメイン名登録業者による侵害事件が新郷市で判決を受けた。

大河網(宋翔楽記者)インターネット上で国民の名誉が毀損され攻撃された場合、侵害発言を掲載したウェブサ...

同僚はDockerとK8Sを知らなかったため、会社のグループチャットから削除されました

Docker はコンテナと同義であり、K8S はコンテナ オーケストレーションと同義です。これら 2...

WeChatマーケティング活動のための4つのヒント

WeChat マーケティングは多くの企業にとって必要な手段の 1 つになっていますが、うまく実行して...

Amazon Web Services: インテリジェンス、専用設計、統合保護に重点を置き、クラウド ストレージのイノベーションを加速

今日、あらゆるビジネスはデータ主導型であるべきです。データからクラウド サービス、ユーザー エクスペ...

タイ VPS: datatan、タイの無制限トラフィック VPS、月額 33 ドル、4G メモリ/3 コア/60g SSD

タイのサーバー会社である Datatan は、2004 年に設立され、2009 年に正式に登録された...

Zhongdai.comは1ヶ月以内に倒産しました。参入障壁のないP2Pウェブサイトは心配です

劉愛林記者と季家鵬記者が北京から報告した。それから1か月も経たないうちに、Zhongdai.comは...

raksmartはどうですか? Raksmartシンガポールクラウドサーバーレビュー、3つのネットワーク本土最適化ライン!

raksmart シンガポールデータセンターはどうですか? raksmart コンピュータ ルームに...