Kafka は、強力な分散機能とパフォーマンス機能により、データ パイプラインの重要な部分として急速に普及した分散パブリッシュ/サブスクライブ システムです。メッセージング、メトリック収集、ストリーム処理、ログ集約など、さまざまなことを実行します。 Kafka のもう 1 つの効果的な使用法は、データを Hadoop にインポートすることです。 Kafka を使用する主な理由は、データ プロデューサーとコンシューマーが分離され、複数の独立したプロデューサー (異なる開発チームによって記述される可能性あり) を使用できるようになることです。同様に、独立したコンシューマーも複数存在します (異なるチームによって記述されている可能性もあります)。さらに、コンシューマーはリアルタイム/同期またはバッチ/オフライン/非同期にすることができます。後者の特性は、RabbitMQ などの他の pub-sub ツールと比較すると大きな違いを生みます。 Kafka を使用するには、理解しておく必要がある概念がいくつかあります。
ブローカー — ブローカーは、トピックとパーティションを管理し、プロデューサーとコンシューマーのリクエストに対応する Kafka プロセスです。 Kafka はトピックの「完全な」順序付けを保証するのではなく、トピックを構成するパーティションが順序付けられていることのみを保証します。コンシューマー アプリケーションは、必要に応じて「グローバル」トピックの順序付けを強制できます。 フォールト トレランスをサポートするために、トピックを複製することができます。つまり、各パーティションは、異なるホスト上に構成可能な数のレプリカを持つことができます。これにより、耐障害性が向上し、単一のサーバーが停止しても、データやプロデューサーとコンシューマーの可用性に壊滅的な影響が及ぶことはありません。 ここでは、Kafka バージョン 0.8 と Camus バージョン 0.8.X が使用されています。 練習: Camus を使用して Kafka から HDFS に Avro データをコピーする この手法は、すでに他の目的で Kafka にデータをストリーミングしていて、そのデータを HDFS に格納したい場合に役立ちます。 質問 データを HDFS にインポートするためのデータ配信メカニズムとして Kafka を使用したいと考えています。 解決 Kafka のデータは、LinkedIn が開発したソリューションである Camus を使用して HDFS に複製されます。 話し合う Camus は LinkedIn が開発したオープンソース プロジェクトです。 LinkedIn では Kafka が広く導入されており、Camus は Kafka から HDFS にデータをコピーするために使用されます。 Camus は、Kafka で JSON と Avro の 2 つのデータ形式をすぐにサポートします。この手法では、Camus を介して Avro データを使用します。 Camus の Avro の組み込みサポートでは、Kafka パブリッシャーが独自の方法で Avro データを記述する必要があるため、この手法では、Kafka で標準のシリアル化データを使用することを前提としています。 この手法を機能させるには、3 つの部分が必要です。まず、Avro データを Kafka に書き込み、次に Camus が Avro データを逆シリアル化できるようにする簡単なクラスを記述し、最後に Camus ジョブを実行してデータのインポートを実行します。 Avro レコードを Kafka に書き込むには、次のコードで、必要な Kafka プロパティを構成して Kafka プロデューサーを設定し、ファイルからいくつかの Avro レコードをロードして、それらを Kafka に書き出す必要があります。 次のコマンドを使用して、test という名前の Kafka トピックにサンプル データをロードできます。 Kafka コンソール コンシューマーを使用すると、データが Kafka に書き込まれたことを確認できます。これにより、バイナリ Avro データがコンソールにダンプされます。 それができたら、Camus でこれらの Avro レコードを読み取れるように Camus コードをいくつか記述します。 実践: カミュとスキーマレジストリの書き方 まず、カミュの 3 つの概念を理解する必要があります。
前述のように、Camus は Avro データをサポートしていますが、Kafka プロデューサーが Camus の KafkaAvroMessageEncoder クラスを使用してデータを書き込む必要があります。このクラスは、Avro シリアル化されたバイナリ データに独自のデータを追加します。これは、Camus のデコーダーが、そのクラスによって書き込まれたことを検証できるようにするためだと考えられます。 この例では、シリアル化に Avro シリアル化が使用されているため、独自のデコーダーを作成する必要があります。幸いなことに、これは簡単です: Kafka では特定の Avro レコードを書き込んだのに、Camus ではそのレコードを特定の Avro レコードではなく汎用 Avro レコードとして読み取ることに気づいたかもしれません。これは、CamusWrapper クラスが汎用 Avro レコードのみをサポートしているためです。それ以外の場合、生成されたコードは、付属するすべてのセキュリティ機能とともに使用できるため、特定の Avro レコードの使用がはるかに簡単になります。 CamusWrapper オブジェクトは Kafka からデータを抽出します。このクラスが存在する理由は、タイムスタンプ、サーバー名、サービス詳細などのメタデータをエンベロープに貼り付けることができるようにするためです。使用されるデータには、各レコードに意味のあるタイムスタンプを関連付けることを強くお勧めします (通常、これはレコードが作成または生成された時刻になります)。次に、タイムスタンプをパラメーターとして受け入れる CamusWrapper コンストラクターを使用できます。
タイムスタンプが設定されていない場合、Camus はラッパーの作成時に新しいタイムスタンプを作成します。このタイムスタンプとその他のメタデータは、出力レコードの HDFS の場所を決定するときに Camus で使用されます。 次に、Camus Avro エンコーダーが HDFS に書き込まれる Avro レコードのスキーマの詳細を認識できるように、スキーマ レジストリを作成する必要があります。スキーマを登録するときに、Avro レコードを取得する Kafka トピックの名前も指定します。 ラン・カミュ Camus は、Kafka データをインポートする Hadoop クラスター上で MapReduce ジョブとして実行されます。 Camus に提供する必要のあるプロパティが多数あります。これは、コマンド ラインまたはプロパティ ファイルを使用して実行できます。次の手法を使用してプロパティ ファイルを使用します。 プロパティからわかるように、Camus にどのトピックをインポートするかを明示的に指示する必要はありません。 Camus は Kafka と自動的に通信して、トピック (およびパーティション) と現在の開始オフセットと終了オフセットを検出します。 インポートされたトピックを正確に制御したい場合は、kafka.whitelist.topics と kafka.blacklist.topics を使用して、それぞれホワイトリスト (トピックを制限) とブラックリスト (トピックを除外) をリストできます。区切り文字としてカンマを使用して複数のトピックを指定できます。次の例に示すように、正規表現もサポートされており、トピック「topic1」または「abc」で始まりその後に 1 つ以上の数字が続く任意のトピックと一致します。値とまったく同じ構文を使用してブラックリストを指定できます。
プロパティがすべて設定されたら、Camus ジョブを実行できます。 これにより、Avro データが HDFS に保存されることになります。 HDFS に何が含まれているか見てみましょう。 最初のファイルにはインポートされたデータが含まれ、他のファイルは Camus によって管理されます。 HDFS 内のデータ ファイルは、AvroDump ユーティリティを使用して表示できます。 では、Camus ジョブが実行されると、具体的に何が起こるのでしょうか? Camus インポート プロセスは、図 5.16 に示すように、MapReduce ジョブとして実行されます。 MapReduce の Camus タスクが成功すると、Camus OutputCommitter (タスクの完了時にカスタム作業を実行できるようにする MapReduce 構造) がタスクのデータ ファイルをアトミックに宛先ディレクトリに移動します。 OutputCommitter は、タスクによって処理されるすべてのパーティションのオフセット ファイルも作成します。同じジョブ内の他のタスクが失敗する可能性がありますが、これは成功したタスクの状態には影響しません。成功したタスクのデータとオフセット出力はそのまま残っているので、後続の Camus 実行では、最後に成功した状態から処理が再開されます。 次に、Camus がデータをインポートする場所と動作を制御する方法を見てみましょう。 データの分割 先ほど、Camus が Kafka にある Avro データをインポートすることを確認しました。図 5.17 に示す HDFS パス構造を詳しく見て、場所を特定するために何ができるかを見てみましょう。 パスの日付/時刻は、CamusWrapper から抽出されたタイムスタンプによって決定されます。 MessageDecoder の Kafka レコードからタイムスタンプを抽出し、CamusWrapper に渡すことが可能です。これにより、データは、デフォルトの Kafka レコードが MapReduce で読み取られた時刻ではなく、意味のある日付でパーティション分割されるようになります。 Camus はプラグ可能なパーティショナーをサポートしており、図 5.18 に示すパスの一部を制御できます。 Camus Partitioner インターフェースには、実装する必要がある 2 つのメソッドが用意されています。 たとえば、カスタム パーティショナーは Hive パーティショニング用のパスを作成できます。 要約する Camus は、HDFS 内の Kafka からデータを取得するための完全なソリューションを提供し、問題が発生した場合の状態の維持とエラー処理を担当します。 Azkaban または Oozie と統合することで、簡単に自動化し、メッセージ時間に基づいて HDFS データを整理することでシンプルなデータ管理を実行できます。 ETL に関しては、Flume と比較してその機能が比類のないものであることは特筆に値します。 Kafka には、データを HDFS に取り込むためのメカニズムがバンドルされています。 MapReduce ジョブで Kafka からデータを抽出するために使用できる KafkaETLInputFormat 入力形式クラスがあります。インポートを実行するには MapReduce ジョブを記述する必要がありますが、データの中間ストレージとして HDFS を使用する代わりに、データを MapReduce フロー内で直接使用できるという利点があります。次に、Hadoop にあるデータをファイルシステムなどの他のシステムに転送する方法について説明します。 |
>>: NetEase Cloudのリアルタイムオーディオフレームワークの背後にあるアルゴリズムの最適化により、製品エクスペリエンスが全面的に向上
[[273022]] 1. 電流制限の役割API インターフェースは呼び出し側の動作を制御できないた...
[[439872]]メタバースの普及により、仮想空間への入り口であるVR(バーチャルリアリティ)が再...
2年間の熾烈な競争を経て、共同購入業界の競争は重要な時期を迎えています。今年最初の8か月間の主要共同...
新年が近づいてきました。A5スタッフ全員に新年のご多幸をお祈りいたします。この間、私は「マーケティン...
name.com の最新プロモーション: .com または .net ドメイン名を 0.99 ドルで...
クリーブランド クリニックの CIO である Matthew Kull 氏は、医療とマーケティングと...
1 Google、悪事を働くな、情報は流れる周知のとおり、Google の目標は「世界の情報を統合す...
以前、私はSutuフォーラムに参加した際、「万達には蓄積すべき経験がなく、採用すべき人材もいない。万...
テクノロジーは常に急速に発展しており、私たちの日常生活にますます統合されつつあります。インターネット...
ウェブサイトはどうすればオンライン マーケティングをうまく行うことができるでしょうか? 今日の最大の...
asvhost は 2008 年に設立され、オーストラリア人によって運営されている可能性があります。...
「インターネットホームデコレーション元年」という言葉を聞いたことがありますか? 「インターネットホー...
レンガ職人はどうですか?シドニーのレンガ職人はどうですか?オーストラリアのシドニーデータセンターのB...
11月3日、杭州で開催された2022年雲奇大会において、アリババDAMOアカデミーとCCFオープンソ...
マルチクラウド アーキテクチャは現在、ほとんどの組織が採用しているクラウド コンピューティング戦略の...