技術的な乾物共有: HBase のデータから Kafka への移行の実践

技術的な乾物共有: HBase のデータから Kafka への移行の実践

1. 概要

実際のアプリケーション シナリオでは、データは HBase クラスターに保存されますが、何らかの特別な理由により、データを HBase から Kafka に移行する必要があります。通常の状況では、ソース データは Kafka に送信され、その後コンシューマーがデータを処理して HBase に書き込みます。しかし、逆のプロセスを実行する場合、HBase データを Kafka に移行するにはどうすればよいでしょうか?今日は具体的な実装プロセスについてお話しします。

2. コンテンツ

一般的なビジネス シナリオは次のとおりです。データ ソースがデータを生成し、Kafka に入り、その後コンシューマー (Flink、Spark、Kafka API など) によって処理されて HBase に入ります。これは典型的なリアルタイム処理フローです。フローチャートは次のとおりです。

上記のリアルタイム処理フローは、結局のところ、データフローが順次処理されるため、データの処理が比較的容易です。しかし、このプロセスを逆にすると、いくつかの問題が発生します。

2.1 膨大なデータ

HBase の分散性とクラスターの水平拡張により、HBase 内のデータは数百億、数千億、あるいはそれ以上になることがよくあります。このレベルのデータは、このタイプの逆データフローのシナリオで、データ取得の問題という非常に厄介な問題を引き起こします。この膨大な量のデータを HBase から取得するにはどうすればよいでしょうか?

2.2 データパーティションなし

HBase は高速かつ簡単にデータを取得したり一覧表示したりできることがわかっています。ただし、Hive のようなデータ ウェアハウスのパーティショニングの概念がなく、一定期間内のデータを提供できません。最新の週のデータを抽出する場合は、テーブル全体をスキャンし、タイムスタンプをフィルタリングしてその週のデータを取得する必要がある場合があります。数が少ない場合は大きな問題にならないかもしれませんが、データ量が多い場合、HBase のテーブル全体をスキャンするのは困難です。

3. 解決策

このような逆データフローをどのように処理するか。実際、これを実現するには、HBase の Get と List の機能を使用できます。 HBase は RowKey を通じてプライマリ インデックスを構築するため、RowKey レベルでのデータ取得速度は非常に高速です。実装プロセスの詳細は次のとおりです。

データフローは上の図に示されています。以下では、各プロセスの実装の詳細と注意事項を分析します。

3.1 行キーの抽出

HBase には Rowkey 取得用のプライマリ インデックスがあることがわかっているので、この機能を使用して拡張できます。 HBase テーブルから大量のデータ内の Rowkey を抽出し、設定した抽出および保存ルールに従って、抽出した Rowkey を HDFS に保存できます。

ここで注意が必要な問題の 1 つは、HBase Rowkey の抽出です。大規模なデータレベルの Rowkey 抽出の場合は、MapReduce を使用して実装することをお勧めします。これは、HBase が提供する TableMapReduceUtil クラスのおかげで実現されます。 MapReduce タスクを通じて、HBase 内の Rowkey はマップ フェーズで指定された時間範囲に従ってフィルタリングされ、reduce フェーズで Rowkey が複数のファイルに分割され、最終的に HDFS に保存されます。

ここで質問がある学生もいるかもしれません。 Rowkey の抽出には MapReduce が使用されているので、列クラスターの下の列データを直接スキャンして処理してみませんか?ここでは、MapReduce タスクを開始すると、HBase データをスキャンするときに Rowkey のみをフィルター処理し (FirstKeyOnlyFilter を使用)、列クラスター データは処理しません。これでかなり速くなります。 HBase RegionServer への負荷も大幅に軽減されます。

  • 行列行001情報:名前行001情報:年齢行001情報:性別行001情報:sn

ここに例があります。上記の表のデータの場合、Rowkey (row001) のみを抽出する必要があります。ただし、実際のビジネス データでは、HBase テーブルには、多くの特徴的な属性 (名前、性別、年齢、ID カードなど) を持つデータが記述される場合があります。一部のビジネス データでは、1 つの列クラスターの下に 12 を超える特性が存在する場合がありますが、Rowkey は 1 つだけであり、必要な Rowkey はこの 1 つだけです。その場合、FirstKeyOnlyFilter を使用して実装するのが適切です。

  1. /**
  2. * フィルター 各行最初KVを返します
  3. * <p>
  4. * このフィルターを使用するとカウント操作をより効率的に実行できます。
  5. */

これは、最初の KV データを返すために使用される FirstKeyOnlyFilter の機能の説明です。実際に役人が数えるのに使っています。ここで少し改善して、FirstKeyOnlyFilter を使用して Rowkey を抽出します。

3.2 行キー生成

抽出された Rowkey を生成するにはどうすればいいですか? Reduce の数は、実際の桁数に基づいて決定できます。 Rowkey ファイルを生成するときは、実際のデータ量に基づいて Reduce の数を計算することをお勧めします。使いやすさを優先して、HDFS ファイルを 1 つだけ使用しないようにしてください。後でメンテナンスが難しくなります。たとえば、HBase テーブルが 100 GB の場合、それを 100 個のファイルに分割できます。

3.3 データ処理

ステップ 1 では、抽出ルールと保存ルールに従って、MapReduce を介して HBase からデータが抽出され、RowKey が HDFS に保存されます。次に、MapReduce タスクを通じて HDFS 上の Rowkey ファイルを読み取り、List を通じて HBase からデータを取得します。分解の詳細は次のとおりです。

Map フェーズでは、HDFS から Rowkey データ ファイルを読み取り、バッチ Get を通じて HBase からデータを取得し、データを組み立てて Reduce フェーズに送信します。 Reduce フェーズでは、Map フェーズからのデータが取得され、Kafka に書き込まれます。 Kafka プロデューサー コールバック関数は、Kafka に書き込まれるデータのステータス情報を取得するために使用され、ステータス情報はデータが正常に書き込まれたかどうかを判断するために使用されます。成功した場合は、成功した進行状況の統計を容易にするために、成功した Rowkey を HDFS に記録します。失敗した場合は、失敗した進行状況の統計を容易にするために、失敗した Rowkey を HDFS に記録します。

3.4 失敗と再試行

MapReduce タスクを通じて Kafka にデータを書き込むときに、失敗が発生する可能性があります。障害が発生した場合は、HDFS に Rowkey を記録するだけで済みます。タスクが完了すると、プログラムは HDFS 上に失敗した Rowkey ファイルがあるかどうかを確認します。存在する場合は、手順 3 を再度開始します。つまり、HDFS 上の失敗した Rowkey ファイルを読み取り、HBase にデータをリストし、データを処理し、最後に Kafka に書き込み、HDFS 上の失敗した Rowkey が処理されるまでこれを繰り返します。

4. 実装コード

ここで実装されているコードの量は複雑ではありません。以下は、これに基づいて変更できる疑似コードです (たとえば、Rowkey を抽出し、MapReduce を使用して Rowkey を読み取り、HBase テーブルをバッチで取得してから、Kafka に書き込むなど)。サンプルコードは次のとおりです。

  1. パブリッククラスMRROW2HDFS {
  2. 公共 静的void main(String[] args)は例外をスローします{
  3. 構成 config = HBaseConfiguration。作成する(); // HBase 構成情報
  4. ジョブ job = Job.getInstance(config, "MRROW2HDFS" );
  5. ジョブにJarByClassを設定します(MRROW2HDFS.class);
  6. ROWReducer クラスをジョブに設定します。
  7. 文字列 hbaseTableName = "hbase_tbl_name" ;
  8. スキャン scan = new Scan();
  9. スキャンキャッシュの設定(1000);
  10. スキャン.setCacheBlocks( false );
  11. scan.setFilter(新しい FirstKeyOnlyFilter());
  12. TableMapReduceUtil.initTableMapperJob(hbaseTableName、スキャン、ROWMapper.class、Text.class、Text.class、ジョブ);
  13. FileOutputFormat.setOutputPath(ジョブ、新しいパス( "/tmp/rowkey.list" )); // ストレージの行キーの HDFs パスを入力します
  14. System.exit(job.waitForCompletion( true ) ? 0 : 1);
  15. }
  16. 公共 静的クラス ROWMapper は TableMapper<Text, Text> を拡張します {
  17. @オーバーライド
  18. protected void map(ImmutableBytesWritableキー, 結果値,
  19. Mapper<ImmutableBytesWritable, Result, Text, Text>.Context コンテキスト)
  20. IOException、InterruptedException をスローします {
  21. for (セル cell : value.rawCells()) {
  22. //日付範囲をフィルターする
  23. // コンテキストを書き込みます(...);
  24. }
  25. }
  26. }
  27.   
  28. 公共 静的クラス ROWReducer は Reducer<Text,Text,Text,Text> を拡張します{
  29. プライベートテキスト結果 = 新しいテキスト();
  30.   
  31. @オーバーライド
  32. protected void Reduce(Text key , Iterable<Text> values ,Context context) は IOException、InterruptedException をスローします {
  33. for (テキスト値:​​){
  34. 結果を設定します(val);
  35. context.write(キー, 結果 );
  36. }
  37. }
  38. }
  39. }

5. まとめ

逆データ処理プロセス全体は複雑ではなく、実装は論理処理が複雑すぎない非常に基本的な MapReduce ロジックです。処理中は、いくつかの詳細を考慮する必要があります。 Rowkey が HDFS 上で生成される場合、行内にスペースが含まれることがあります。 HDFS 上の Rowkey ファイルをリストに読み込むときは、データごとにスペースをフィルタリングするのが最適です。さらに、失敗したタスクの再実行とデータの調整を容易にするために、成功した Rowkey 処理と失敗した Rowkey 処理の記録が保持されます。データ移行の進行状況と完了状況を知ることができます。同時に、Kafka Eagle 監視ツールを使用して、Kafka の書き込みの進行状況を表示することもできます。

<<:  IoTデータ管理には重要なタスクを処理するためのエッジコンピューティングが必要

>>:  ネットワーク + ストレージ + 仮想化: 新しいネットワークを構築するための 3 つの要素

推薦する

ワンストップのクラウドネイティブ FinOps プラットフォーム - KubeFin

KubeFin : マルチクラウドおよびマルチクラスターのコスト分析とコスト最適化をサポートし、クラ...

インフルエンサーがライブ配信で商品を販売するための10のポイント!

インターネットセレブによるライブストリーミングは、強力なインタラクティブ性とリアルタイムのフィードバ...

微信の公開アカウントが是正の焦点となる。テンセントは報告者に25万元の報奨金を支給したと述べた。

業界関係者は、今回の是正措置はWeChatのユーザー活動などの指標に影響を及ぼすとみているが、WeC...

#11.11# RackNerd: 複数の安価な米国 VPS、最低 $12/年、オプションのデータセンター 6 つ

Racknerd は、2018 年の China Double Eleven プロモーションを正式に...

画像の最適化により、ウェブサイトのランキングは依然として伝説的なものとなるでしょうか?

キーワードランキングのみに重点を置き、コンテンツ(画像)の最適化を無視する企業ウェブサイトが増えてお...

servarica-512m メモリ XEN 3.8ドル / 1Gメモリ Windows 月額支払い 7ドル

Servarica は 2010 年に設立されたカナダの会社 (Rica Web Services)...

トラフィック獲得のための4つのチャネルと戦略!

はじめに:トラフィック不足の時代において、これら 4 種類のトラフィック エントリは決して時代遅れに...

Baidu SEO ウェブサイトのトップ 10 の問題と解決策

(1)ウェブサイトリンク理由: ウェブサイトに外部リンクが不足しているか、外部リンクが徐々に減少して...

クラウドネイティブ: ソフトウェア配信の未来

こんにちは、皆さん。私はルガです。今日は、クラウド ネイティブ エコシステムの本質の 1 つである効...

ビッグデータの孤立を打破: 主流の SaaS ベンダーが共同でオープン スタンダードを構築

10月27日、2017 iResearch A10 ビッグデータサミットにおいて「Intellige...

APP チャネルの品質を区別する 5 つの重要なポイント。盲目的な試行錯誤のコストを節約します。

チャネル数の増加に伴い、CP の選択肢はますます増えています。しかし、チャネルの品質は多くの CP ...

ウェブサイト分析ハック: エントリー、エグジット、バウンスレポート (パート 1)

この記事は、Web 分析の第一人者である Eric T. Peterson 氏の著書「Web Sit...

「仮想化」(Intel VT および AMD SVM)に関するある程度の理解

[[282702]] 1. はじめに数日前、BIOS に入り、何気なくパラパラと見て、理解できない機...

新しい携帯電話システムHuawei Oum microの機能のマーケティング

月収10万元の起業の夢を実現するミニプログラム起業支援プランeコマースは、多くの人々がビジネスを開始...

2018年のOpenStackユーザー調査レポートが発表され、EasyStackがトップ3にランクイン

2018 年 11 月 13 日、ドイツのベルリンで毎年恒例の OpenStack Summit が...