Kafka は、強力な分散機能とパフォーマンス機能により、データ パイプラインの重要な部分として急速に普及した分散パブリッシュ/サブスクライブ システムです。メッセージング、メトリック収集、ストリーム処理、ログ集約など、さまざまなことを実行します。 Kafka のもう 1 つの効果的な使用法は、データを Hadoop にインポートすることです。 Kafka を使用する主な理由は、データ プロデューサーとコンシューマーが分離され、複数の独立したプロデューサー (異なる開発チームによって記述される可能性あり) を使用できるようになることです。同様に、独立したコンシューマーも複数存在します (異なるチームによって記述されている可能性もあります)。さらに、コンシューマーはリアルタイム/同期またはバッチ/オフライン/非同期にすることができます。後者の特性は、RabbitMQ などの他の pub-sub ツールと比較すると大きな違いを生みます。 Kafka を使用するには、理解しておく必要がある概念がいくつかあります。
ブローカー — ブローカーは、トピックとパーティションを管理し、プロデューサーとコンシューマーのリクエストに対応する Kafka プロセスです。 Kafka はトピックの「完全な」順序付けを保証するのではなく、トピックを構成するパーティションが順序付けられていることのみを保証します。コンシューマー アプリケーションは、必要に応じて「グローバル」トピックの順序付けを強制できます。 フォールト トレランスをサポートするために、トピックを複製することができます。つまり、各パーティションは、異なるホスト上に構成可能な数のレプリカを持つことができます。これにより、耐障害性が向上し、単一のサーバーが停止しても、データやプロデューサーとコンシューマーの可用性に壊滅的な影響が及ぶことはありません。 ここでは、Kafka バージョン 0.8 と Camus バージョン 0.8.X が使用されています。 練習: Camus を使用して Kafka から HDFS に Avro データをコピーする この手法は、すでに他の目的で Kafka にデータをストリーミングしていて、そのデータを HDFS に格納したい場合に役立ちます。 質問 データを HDFS にインポートするためのデータ配信メカニズムとして Kafka を使用したいと考えています。 解決 Kafka のデータは、LinkedIn が開発したソリューションである Camus を使用して HDFS に複製されます。 話し合う Camus は LinkedIn が開発したオープンソース プロジェクトです。 LinkedIn では Kafka が広く導入されており、Camus は Kafka から HDFS にデータをコピーするために使用されます。 Camus は、Kafka で JSON と Avro の 2 つのデータ形式をすぐにサポートします。この手法では、Camus を介して Avro データを使用します。 Camus の Avro の組み込みサポートでは、Kafka パブリッシャーが独自の方法で Avro データを記述する必要があるため、この手法では、Kafka で標準のシリアル化データを使用することを前提としています。 この手法を機能させるには、3 つの部分が必要です。まず、Avro データを Kafka に書き込み、次に Camus が Avro データを逆シリアル化できるようにする簡単なクラスを記述し、最後に Camus ジョブを実行してデータのインポートを実行します。 Avro レコードを Kafka に書き込むには、次のコードで、必要な Kafka プロパティを構成して Kafka プロデューサーを設定し、ファイルからいくつかの Avro レコードをロードして、それらを Kafka に書き出す必要があります。 次のコマンドを使用して、test という名前の Kafka トピックにサンプル データをロードできます。 Kafka コンソール コンシューマーを使用すると、データが Kafka に書き込まれたことを確認できます。これにより、バイナリ Avro データがコンソールにダンプされます。 それができたら、Camus でこれらの Avro レコードを読み取れるように Camus コードをいくつか記述します。 実践: カミュとスキーマレジストリの書き方 まず、カミュの 3 つの概念を理解する必要があります。
前述のように、Camus は Avro データをサポートしていますが、Kafka プロデューサーが Camus の KafkaAvroMessageEncoder クラスを使用してデータを書き込む必要があります。このクラスは、Avro シリアル化されたバイナリ データに独自のデータを追加します。これは、Camus のデコーダーが、そのクラスによって書き込まれたことを検証できるようにするためだと考えられます。 この例では、シリアル化に Avro シリアル化が使用されているため、独自のデコーダーを作成する必要があります。幸いなことに、これは簡単です: Kafka では特定の Avro レコードを書き込んだのに、Camus ではそのレコードを特定の Avro レコードではなく汎用 Avro レコードとして読み取ることに気づいたかもしれません。これは、CamusWrapper クラスが汎用 Avro レコードのみをサポートしているためです。それ以外の場合、生成されたコードは、付属するすべてのセキュリティ機能とともに使用できるため、特定の Avro レコードの使用がはるかに簡単になります。 CamusWrapper オブジェクトは Kafka からデータを抽出します。このクラスが存在する理由は、タイムスタンプ、サーバー名、サービス詳細などのメタデータをエンベロープに貼り付けることができるようにするためです。使用されるデータには、各レコードに意味のあるタイムスタンプを関連付けることを強くお勧めします (通常、これはレコードが作成または生成された時刻になります)。次に、タイムスタンプをパラメーターとして受け入れる CamusWrapper コンストラクターを使用できます。
タイムスタンプが設定されていない場合、Camus はラッパーの作成時に新しいタイムスタンプを作成します。このタイムスタンプとその他のメタデータは、出力レコードの HDFS の場所を決定するときに Camus で使用されます。 次に、Camus Avro エンコーダーが HDFS に書き込まれる Avro レコードのスキーマの詳細を認識できるように、スキーマ レジストリを作成する必要があります。スキーマを登録するときに、Avro レコードを取得する Kafka トピックの名前も指定します。 ラン・カミュ Camus は、Kafka データをインポートする Hadoop クラスター上で MapReduce ジョブとして実行されます。 Camus に提供する必要のあるプロパティが多数あります。これは、コマンド ラインまたはプロパティ ファイルを使用して実行できます。次の手法を使用してプロパティ ファイルを使用します。 プロパティからわかるように、Camus にどのトピックをインポートするかを明示的に指示する必要はありません。 Camus は Kafka と自動的に通信して、トピック (およびパーティション) と現在の開始オフセットと終了オフセットを検出します。 インポートされたトピックを正確に制御したい場合は、kafka.whitelist.topics と kafka.blacklist.topics を使用して、それぞれホワイトリスト (トピックを制限) とブラックリスト (トピックを除外) をリストできます。区切り文字としてカンマを使用して複数のトピックを指定できます。次の例に示すように、正規表現もサポートされており、トピック「topic1」または「abc」で始まりその後に 1 つ以上の数字が続く任意のトピックと一致します。値とまったく同じ構文を使用してブラックリストを指定できます。
プロパティがすべて設定されたら、Camus ジョブを実行できます。 これにより、Avro データが HDFS に保存されることになります。 HDFS に何が含まれているか見てみましょう。 最初のファイルにはインポートされたデータが含まれ、他のファイルは Camus によって管理されます。 HDFS 内のデータ ファイルは、AvroDump ユーティリティを使用して表示できます。 では、Camus ジョブが実行されると、具体的に何が起こるのでしょうか? Camus インポート プロセスは、図 5.16 に示すように、MapReduce ジョブとして実行されます。 MapReduce の Camus タスクが成功すると、Camus OutputCommitter (タスクの完了時にカスタム作業を実行できるようにする MapReduce 構造) がタスクのデータ ファイルをアトミックに宛先ディレクトリに移動します。 OutputCommitter は、タスクによって処理されるすべてのパーティションのオフセット ファイルも作成します。同じジョブ内の他のタスクが失敗する可能性がありますが、これは成功したタスクの状態には影響しません。成功したタスクのデータとオフセット出力はそのまま残っているので、後続の Camus 実行では、最後に成功した状態から処理が再開されます。 次に、Camus がデータをインポートする場所と動作を制御する方法を見てみましょう。 データの分割 先ほど、Camus が Kafka にある Avro データをインポートすることを確認しました。図 5.17 に示す HDFS パス構造を詳しく見て、場所を特定するために何ができるかを見てみましょう。 パスの日付/時刻は、CamusWrapper から抽出されたタイムスタンプによって決定されます。 MessageDecoder の Kafka レコードからタイムスタンプを抽出し、CamusWrapper に渡すことが可能です。これにより、データは、デフォルトの Kafka レコードが MapReduce で読み取られた時刻ではなく、意味のある日付でパーティション分割されるようになります。 Camus はプラグ可能なパーティショナーをサポートしており、図 5.18 に示すパスの一部を制御できます。 Camus Partitioner インターフェースには、実装する必要がある 2 つのメソッドが用意されています。 たとえば、カスタム パーティショナーは Hive パーティショニング用のパスを作成できます。 要約する Camus は、HDFS 内の Kafka からデータを取得するための完全なソリューションを提供し、問題が発生した場合の状態の維持とエラー処理を担当します。 Azkaban または Oozie と統合することで、簡単に自動化し、メッセージ時間に基づいて HDFS データを整理することでシンプルなデータ管理を実行できます。 ETL に関しては、Flume と比較してその機能が比類のないものであることは特筆に値します。 Kafka には、データを HDFS に取り込むためのメカニズムがバンドルされています。 MapReduce ジョブで Kafka からデータを抽出するために使用できる KafkaETLInputFormat 入力形式クラスがあります。インポートを実行するには MapReduce ジョブを記述する必要がありますが、データの中間ストレージとして HDFS を使用する代わりに、データを MapReduce フロー内で直接使用できるという利点があります。次に、Hadoop にあるデータをファイルシステムなどの他のシステムに転送する方法について説明します。 |
>>: NetEase Cloudのリアルタイムオーディオフレームワークの背後にあるアルゴリズムの最適化により、製品エクスペリエンスが全面的に向上
みなさんこんにちは、シャオシです。セルフメディアの話題はますます奇妙になっています。一般の人々がセル...
アプリを宣伝するときには、その評価やレビューをどのように改善するかという状況に直面することがよくあり...
2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています企業がウェ...
cmivpsは以前のモバイルcmi国際ラインを変更したようです。公式も大量のサーバーを追加しました。...
王星が非常に傲慢であることは、インターネット界では秘密ではありません。王興氏が常に強調してきたように...
ウェブサイトの最適化、特にウェブサイトのキーワードランキングの最適化として、検索エンジンで上位 3 ...
中国文化は奥深く広大です。日常のやり取りやコミュニケーションの中で、1 つの単語に複数の意味があるこ...
SEO スキルの話題は、SEO 担当者の間で常に話題になっています。SEO フォーラムやウェブマスタ...
トピックを理解する - 特定のテーマに関するトピックなので、導入部は不可欠です。このような導入部はす...
起源前回の記事では、kube-apiserver へのリストおよび監視リクエストを開始する Info...
皆さんが考えていることとは反対に、エンタープライズ IoT プロジェクトを実行するためにクラウド プ...
SEO はかつてウェブサイト トラフィックの主なソースであり、ここ数年で盛んに実施されてきました。検...
はじめに: 厦門での検索エンジン戦略会議の前に、WinTimes は世界的に有名な SEO 専門家で...
この記事では、クラウド パフォーマンス テストの種類、さまざまな形式、利点、メリット、一般的なツール...
ウェブサイトのスナップショットは常に数か月前のものであり、ウェブサイトは毎日更新されています。何が問...