Kafka から Hadoop にデータを素早くインポートするにはどうすればよいでしょうか?

Kafka から Hadoop にデータを素早くインポートするにはどうすればよいでしょうか?

Kafka は、強力な分散機能とパフォーマンス機能により、データ パイプラインの重要な部分として急速に普及した分散パブリッシュ/サブスクライブ システムです。メッセージング、メトリック収集、ストリーム処理、ログ集約など、さまざまなことを実行します。 Kafka のもう 1 つの効果的な使用法は、データを Hadoop にインポートすることです。 Kafka を使用する主な理由は、データ プロデューサーとコンシューマーが分離され、複数の独立したプロデューサー (異なる開発チームによって記述される可能性あり) を使用できるようになることです。同様に、独立したコンシューマーも複数存在します (異なるチームによって記述されている可能性もあります)。さらに、コンシューマーはリアルタイム/同期またはバッチ/オフライン/非同期にすることができます。後者の特性は、RabbitMQ などの他の pub-sub ツールと比較すると大きな違いを生みます。

Kafka を使用するには、理解しておく必要がある概念がいくつかあります。

  • トピック - トピックは関連メッセージのサブスクリプション ソースです。
  • パーティション - 各トピックは 1 つ以上のパーティションで構成されます。パーティションは、ログ ファイルによってバックアップされる順序付けられたメッセージ キューです。
  • プロデューサーとコンシューマー - プロデューサーとコンシューマーは、パーティションにメッセージを書き込み、パーティションからメッセージを読み取ります。

ブローカー — ブローカーは、トピックとパーティションを管理し、プロデューサーとコンシューマーのリクエストに対応する Kafka プロセスです。

Kafka はトピックの「完全な」順序付けを保証するのではなく、トピックを構成するパーティションが順序付けられていることのみを保証します。コンシューマー アプリケーションは、必要に応じて「グローバル」トピックの順序付けを強制できます。

図5.14はKafkaの概念モデルを示している。

図5.15は、Kafkaで分散パーティションをデプロイする方法の例を示しています。

フォールト トレランスをサポートするために、トピックを複製することができます。つまり、各パーティションは、異なるホスト上に構成可能な数のレプリカを持つことができます。これにより、耐障害性が向上し、単一のサーバーが停止しても、データやプロデューサーとコンシューマーの可用性に壊滅的な影響が及ぶことはありません。

ここでは、Kafka バージョン 0.8 と Camus バージョン 0.8.X が使用されています。

練習: Camus を使用して Kafka から HDFS に Avro データをコピーする

この手法は、すでに他の目的で Kafka にデータをストリーミングしていて、そのデータを HDFS に格納したい場合に役立ちます。

質問

データを HDFS にインポートするためのデータ配信メカニズムとして Kafka を使用したいと考えています。

解決

Kafka のデータは、LinkedIn が開発したソリューションである Camus を使用して HDFS に複製されます。

話し合う

Camus は LinkedIn が開発したオープンソース プロジェクトです。 LinkedIn では Kafka が広く導入されており、Camus は Kafka から HDFS にデータをコピーするために使用されます。

Camus は、Kafka で JSON と Avro の 2 つのデータ形式をすぐにサポートします。この手法では、Camus を介して Avro データを使用します。 Camus の Avro の組み込みサポートでは、Kafka パブリッシャーが独自の方法で Avro データを記述する必要があるため、この手法では、Kafka で標準のシリアル化データを使用することを前提としています。

この手法を機能させるには、3 つの部分が必要です。まず、Avro データを Kafka に書き込み、次に Camus が Avro データを逆シリアル化できるようにする簡単なクラスを記述し、最後に Camus ジョブを実行してデータのインポートを実行します。

Avro レコードを Kafka に書き込むには、次のコードで、必要な Kafka プロパティを構成して Kafka プロデューサーを設定し、ファイルからいくつかの Avro レコードをロードして、それらを Kafka に書き出す必要があります。

次のコマンドを使用して、test という名前の Kafka トピックにサンプル データをロードできます。

Kafka コンソール コンシューマーを使用すると、データが Kafka に書き込まれたことを確認できます。これにより、バイナリ Avro データがコンソールにダンプされます。

それができたら、Camus でこれらの Avro レコードを読み取れるように Camus コードをいくつか記述します。

実践: カミュとスキーマレジストリの書き方

まず、カミュの 3 つの概念を理解する必要があります。

  • デコーダー — デコーダーの役割は、Kafka から抽出された生データを Camus 形式に変換することです。
  • エンコーダー — エンコーダーは、デコードされたデータを HDFS に保存される形式にシリアル化します。
  • スキーマ レジストリ - エンコードされる Avro データに関するスキーマ情報を提供します。

前述のように、Camus は Avro データをサポートしていますが、Kafka プロデューサーが Camus の KafkaAvroMessageEncoder クラスを使用してデータを書き込む必要があります。このクラスは、Avro シリアル化されたバイナリ データに独自のデータを追加します。これは、Camus のデコーダーが、そのクラスによって書き込まれたことを検証できるようにするためだと考えられます。

この例では、シリアル化に Avro シリアル化が使用されているため、独自のデコーダーを作成する必要があります。幸いなことに、これは簡単です:

Kafka では特定の Avro レコードを書き込んだのに、Camus ではそのレコードを特定の Avro レコードではなく汎用 Avro レコードとして読み取ることに気づいたかもしれません。これは、CamusWrapper クラスが汎用 Avro レコードのみをサポートしているためです。それ以外の場合、生成されたコードは、付属するすべてのセキュリティ機能とともに使用できるため、特定の Avro レコードの使用がはるかに簡単になります。

CamusWrapper オブジェクトは Kafka からデータを抽出します。このクラスが存在する理由は、タイムスタンプ、サーバー名、サービス詳細などのメタデータをエンベロープに貼り付けることができるようにするためです。使用されるデータには、各レコードに意味のあるタイムスタンプを関連付けることを強くお勧めします (通常、これはレコードが作成または生成された時刻になります)。次に、タイムスタンプをパラメーターとして受け入れる CamusWrapper コンストラクターを使用できます。

  1. public CamusWrapper(R レコード、長いタイムスタンプ) {...}

タイムスタンプが設定されていない場合、Camus はラッパーの作成時に新しいタイムスタンプを作成します。このタイムスタンプとその他のメタデータは、出力レコードの HDFS の場所を決定するときに Camus で使用されます。

次に、Camus Avro エンコーダーが HDFS に書き込まれる Avro レコードのスキーマの詳細を認識できるように、スキーマ レジストリを作成する必要があります。スキーマを登録するときに、Avro レコードを取得する Kafka トピックの名前も指定します。

ラン・カミュ

Camus は、Kafka データをインポートする Hadoop クラスター上で MapReduce ジョブとして実行されます。 Camus に提供する必要のあるプロパティが多数あります。これは、コマンド ラインまたはプロパティ ファイルを使用して実行できます。次の手法を使用してプロパティ ファイルを使用します。

プロパティからわかるように、Camus にどのトピックをインポートするかを明示的に指示する必要はありません。 Camus は Kafka と自動的に通信して、トピック (およびパーティション) と現在の開始オフセットと終了オフセットを検出します。

インポートされたトピックを正確に制御したい場合は、kafka.whitelist.topics と kafka.blacklist.topics を使用して、それぞれホワイトリスト (トピックを制限) とブラックリスト (トピックを除外) をリストできます。区切り文字としてカンマを使用して複数のトピックを指定できます。次の例に示すように、正規表現もサポートされており、トピック「topic1」または「abc」で始まりその後に 1 つ以上の数字が続く任意のトピックと一致します。値とまったく同じ構文を使用してブラックリストを指定できます。

  1. kafka.whitelist.topics=トピック1、abc[0-9]+

プロパティがすべて設定されたら、Camus ジョブを実行できます。

これにより、Avro データが HDFS に保存されることになります。 HDFS に何が含まれているか見てみましょう。

最初のファイルにはインポートされたデータが含まれ、他のファイルは Camus によって管理されます。

HDFS 内のデータ ファイルは、AvroDump ユーティリティを使用して表示できます。

では、Camus ジョブが実行されると、具体的に何が起こるのでしょうか? Camus インポート プロセスは、図 5.16 に示すように、MapReduce ジョブとして実行されます。

MapReduce の Camus タスクが成功すると、Camus OutputCommitter (タスクの完了時にカスタム作業を実行できるようにする MapReduce 構造) がタスクのデータ ファイルをアトミックに宛先ディレクトリに移動します。 OutputCommitter は、タスクによって処理されるすべてのパーティションのオフセット ファイルも作成します。同じジョブ内の他のタスクが失敗する可能性がありますが、これは成功したタスクの状態には影響しません。成功したタスクのデータとオフセット出力はそのまま残っているので、後続の Camus 実行では、最後に成功した状態から処理が再開されます。

次に、Camus がデータをインポートする場所と動作を制御する方法を見てみましょう。

データの分割

先ほど、Camus が Kafka にある Avro データをインポートすることを確認しました。図 5.17 に示す HDFS パス構造を詳しく見て、場所を特定するために何ができるかを見てみましょう。

図5.17 HDFSにエクスポートされたデータを解析するためのCamus出力パス

パスの日付/時刻は、CamusWrapper から抽出されたタイムスタンプによって決定されます。 MessageDecoder の Kafka レコードからタイムスタンプを抽出し、CamusWrapper に渡すことが可能です。これにより、データは、デフォルトの Kafka レコードが MapReduce で読み取られた時刻ではなく、意味のある日付でパーティション分割されるようになります。

Camus はプラグ可能なパーティショナーをサポートしており、図 5.18 に示すパスの一部を制御できます。

図5.18 Camusパーティションパス

Camus Partitioner インターフェースには、実装する必要がある 2 つのメソッドが用意されています。

たとえば、カスタム パーティショナーは Hive パーティショニング用のパスを作成できます。

要約する

Camus は、HDFS 内の Kafka からデータを取得するための完全なソリューションを提供し、問題が発生した場合の状態の維持とエラー処理を担当します。 Azkaban または Oozie と統合することで、簡単に自動化し、メッセージ時間に基づいて HDFS データを整理することでシンプルなデータ管理を実行できます。 ETL に関しては、Flume と比較してその機能が比類のないものであることは特筆に値します。

Kafka には、データを HDFS に取り込むためのメカニズムがバンドルされています。 MapReduce ジョブで Kafka からデータを抽出するために使用できる KafkaETLInputFormat 入力形式クラスがあります。インポートを実行するには MapReduce ジョブを記述する必要がありますが、データの中間ストレージとして HDFS を使用する代わりに、データを MapReduce フロー内で直接使用できるという利点があります。次に、Hadoop にあるデータをファイルシステムなどの他のシステムに転送する方法について説明します。

<<:  クラウドデータ管理を解き放つ4つの鍵

>>:  NetEase Cloudのリアルタイムオーディオフレームワークの背後にあるアルゴリズムの最適化により、製品エクスペリエンスが全面的に向上

推薦する

分散サービス電流制限の実践、私たちはすでにあなたのためにピットを手配しました

[[273022]] 1. 電流制限の役割API インターフェースは呼び出し側の動作を制御できないた...

ファンタジーから現実へ、VRの台頭

[[439872]]メタバースの普及により、仮想空間への入り口であるVR(バーチャルリアリティ)が再...

9大グループ購入サイトの新状況は満杯、滴滴出行と58団は遅れをとる可能性

2年間の熾烈な競争を経て、共同購入業界の競争は重要な時期を迎えています。今年最初の8か月間の主要共同...

マーケティングプランナーのための4つのヒント

新年が近づいてきました。A5スタッフ全員に新年のご多幸をお祈りいたします。この間、私は「マーケティン...

Name.com イベント: com/net に登録して $5 + $0.99 を獲得

name.com の最新プロモーション: .com または .net ドメイン名を 0.99 ドルで...

インダストリアルクラウドがイノベーションの基盤を築く方法

クリーブランド クリニックの CIO である Matthew Kull 氏は、医療とマーケティングと...

Lu Songsong のブログ: Google、Baidu、Google の最も重要な違い

1 Google、悪事を働くな、情報は流れる周知のとおり、Google の目標は「世界の情報を統合す...

ワンダの電子商取引分析によると、B2CとO2Oの両方が困難に直面している

以前、私はSutuフォーラムに参加した際、「万達には蓄積すべき経験がなく、採用すべき人材もいない。万...

ビッグテクノロジー時代のネットワーク変革

テクノロジーは常に急速に発展しており、私たちの日常生活にますます統合されつつあります。インターネット...

ウェブサイトのトラフィックを無駄にせず、セカンダリマーケティングのためのいくつかのソリューションを実行する方法

ウェブサイトはどうすればオンライン マーケティングをうまく行うことができるでしょうか? 今日の最大の...

台無しになったインターネットデコレーション

「インターネットホームデコレーション元年」という言葉を聞いたことがありますか? 「インターネットホー...

[AUSYD_1] オーストラリア シドニー ユニコム AS9929 ラインの VPS レビュー

レンガ職人はどうですか?シドニーのレンガ職人はどうですか?オーストラリアのシドニーデータセンターのB...

アリババDAMOアカデミーは5年間かけてスキルを磨き、AIの適用の難しさに対処するためにAIモデルコミュニティを立ち上げました。

11月3日、杭州で開催された2022年雲奇大会において、アリババDAMOアカデミーとCCFオープンソ...

マルチクラウドアーキテクチャを計画する方法

マルチクラウド アーキテクチャは現在、ほとんどの組織が採用しているクラウド コンピューティング戦略の...