Kafka と Druid を使用した Spark ストリーミングの理解

Kafka と Druid を使用した Spark ストリーミングの理解

[[326057]]

データ エンジニアとして、Spark Streaming、Kafka、Apache Druid などのビッグ データ テクノロジーに取り組んでいます。それぞれに独自のチュートリアルと RTFM ページがあります。ただし、これらのテクノロジーを大規模に組み合わせる場合は、より複雑な本番環境のユースケースをカバーするソリューションが必要になるでしょう。このブログ記事では、Spark Streaming、Kafka、Apache Druid を組み合わせてリアルタイム分析ダッシュボードを構築し、正確なデータ表現を保証することで得られた知識を共有します。

始める前に…リアルタイム分析について少しお話します

リアルタイム分析はビッグデータ テクノロジーの新しいトレンドであり、多くの場合、ビジネスに大きな影響を与えます。最新のデータを分析すると、洞察はより正確になります。たとえば、データ アナリスト、BI、アカウント マネージャー チームにリアルタイムの分析ダッシュボードを提供すると、これらのチームは迅速な意思決定を行うことができます。大規模なリアルタイム分析のための一般的なアーキテクチャは、Spark Streaming と Kafka に基づいています。どちらのテクノロジーも非常にスケーラブルです。これらはクラスター上で実行され、多数のコンピューターに負荷を分散します。 Spark ジョブの出力は、特定のユースケースとアーキテクチャに応じて、さまざまな宛先に送信できます。私たちの目標は、リアルタイムのイベントを表示する視覚的なツールを提供することです。この目的のために、Apache Druid データベースを選択しました。

Apache Druid でのデータ可視化

Druid は、高性能なリアルタイム分析データベースです。その利点の 1 つは、Kafka トピックからリアルタイム データを取得し、その上に Pivot モジュールを使用して強力な視覚化を構築できることです。視覚化機能により、さまざまなアドホックな「スライス アンド ダイス」クエリを実行し、視覚的な結果をすばやく取得できます。これは、特定のスポーツが特定の国でどのように機能するかなど、さまざまなユースケースを分析するのに役立ちます。データは 1 ~ 2 分の遅延でリアルタイムに取得されます。

建築

そこで、Kafka イベントと Apache Druid をベースにしたリアルタイム分析システムを構築することにしました。 Kafka トピックではすでにアクティビティが行われています。しかし、それらを直接 Druid に取り込むことはできません。各イベントにさらに多くの次元を追加する必要があります。 Druid で便利に表示できるように、各イベントにさらに多くのデータを追加する必要があります。規模に関しては、1 分間に数十万件のイベントを処理するため、これらの数値をサポートできるテクノロジーを使用する必要があります。私たちは、Spark Streaming ジョブを使用して生の Kafka イベントを充実させることにしました。


図1. リアルタイム分析アーキテクチャ

Spark Streaming ジョブは永久に実行されますか?あまり。

Spark Streaming ジョブの考え方は、常に実行されることです。この作業は決して止まるべきではない。 Kafka トピックからイベントを継続的に読み取り、処理し、出力を別の Kafka トピックに書き込みます。しかし、これは楽観的な見方です。現実の世界では、物事はもっと複雑です。 Spark クラスターでドライバー障害が発生しています。その場合、ジョブは再開されます。場合によっては、Spark アプリケーションの新しいバージョンが本番環境にデプロイされることがあります。この場合はどうなるのでしょうか?再開されたジョブはどのようにして Kafka トピックを読み取り、イベントを処理するのでしょうか?詳細に入る前に、次の図は、Spark Streaming ジョブを再開したときに Druid に表示される内容を示しています。


図2. ジョブ再開時のデータ損失

間違いなくデータ損失です!

私たちはどんな問題を解決しようとしているのでしょうか?

私たちは、1 つの Kafka トピックからイベントを読み取り、別の Kafka トピックにイベントを書き込む Spark Streaming アプリケーションに取り組んでいます。これらのイベントは後で Druid で表示されます。私たちの目標は、Spark Streaming アプリケーションの再起動中にスムーズなデータ視覚化を実現することです。つまり、Spark Streaming ジョブの再起動中にイベントが失われたり重複したりしないようにする必要があります。

すべては補償の問題だ

ジョブを再開したときにデータが失われる理由を理解するには、Kafka アーキテクチャのいくつかの用語を理解する必要があります。 Kafka の公式ドキュメントはここにあります。簡単に言うと、Kafka のイベントはトピックに保存されます。各トピックはパーティションに分割されます。パーティション内の各レコードには、レコードの順序を定義する連番であるオフセットがあります。アプリケーションがこのトピックを使用すると、さまざまな方法でオフセットを処理できます。デフォルトの動作では、常に最新のオフセットから読み取ります。もう 1 つのオプションは、オフセットをコミットすることです。これにより、オフセットが保持され、ジョブが再起動されたときにコミットされたオフセットを読み取り、そこから続行できるようになります。ソリューションのステップを確認し、各ステップでの Kafka オフセット管理についてより深く理解しましょう。

ステップ1 - オフセットを自動的にコミットする

デフォルトの動作では、常に最新のオフセットから読み取ります。ジョブが再開されるとトピックに新しいイベントがあるため、これは機能しません。ジョブが Latest から読み取る場合、図 2 に示すように、再起動中に追加されたすべてのメッセージが失われます。Spark Streaming には「enable.auto.commit」パラメータがあります。デフォルトでは、その値は false です。図 3 は、値を true に変更し、Spark アプリケーションを実行して再起動した後の動作を示しています。


図3. ジョブ再開のデータピーク

Kafka の自動コミット機能を使用すると、新しい効果があることがわかります。 「データ損失」はありませんが、重複したイベントが発生しています。実際に「爆発的な」出来事は起きなかった。実際に起こるのは、自動コミット メカニズムがオフセットを「随時」コミットすることです。出力トピックにはコミットされていないメッセージが多数あります。再起動後、ジョブは最新のコミットされたオフセットからのメッセージを消費し、これらのイベントの一部を再度処理します。そのため、出力には多数のイベントが表示されます。

明らかに、これらの重複を視覚化に組み込むと、このデータのビジネス消費者に誤解を招き、意思決定やシステムへの信頼に影響を与える可能性があります。

ステップ2: Kafkaオフセットを手動でコミットする

したがって、Kafka の自動コミット機能に依存することはできません。 Kafka の補正は自分で行う必要があります。これを実行するには、Spark Streaming が Kafka トピックからデータをどのように消費するかを見てみましょう。 Spark Streaming は、離散ストリームまたは DStream と呼ばれるアーキテクチャを使用します。 DStream は、Spark の主な抽象化の 1 つである RDD (Resilient Distributed Datasets) の連続したシーケンスによって表されます。ほとんどの Spark Streaming ジョブは次のようになります。

  1. dstream.foreachRDD { rdd => rdd.foreach { レコード => プロセス(レコード) } }

この場合、レコードを処理するということは、レコードを出力 Kafka トピックに書き込むことを意味します。したがって、Kafka オフセットをコミットするには、次の操作を行う必要があります。

  1. dstream.foreachRDD { rdd => val offsetRanges =
  2. rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreach { レコード
  3. => プロセス(記録)}
  4. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }

これは単純なアプローチですが、その前に全体像を見てみましょう。オフセットを正しく処理すると仮定します。つまり、オフセットは各 RDD 処理後に保存されます。ジョブを停止するとどうなりますか?ジョブは RDD の処理の途中で停止します。マイクロバッチの処理された部分は出力 Kafka トピックに書き込まれますが、コミットされません。ジョブが再度実行されると、一部のメッセージが再度処理され、重複メッセージの急増が Druid に表示されます (以前と同様)。


図4. ジョブ再開時のデータスパイク

通常のシャットダウン

RDD 処理中にジョブが強制終了されないようにする方法があることがわかりました。これは「正常なシャットダウン」と呼ばれます。 Spark アプリケーションを正常に終了する方法を説明するブログ投稿がいくつかありますが、そのほとんどは古いバージョンの Spark に関連しており、多くの制限があります。私たちは、あらゆる規模で機能し、特定の Spark バージョンや OS に縛られない「安全な」ソリューションを探していました。正常なシャットダウンを有効にするには、次のパラメータを使用して Spark コンテキストを作成する必要があります: spark.streaming.stopGracefullyOnShutdown=true。これは、JVM がシャットダウンしたときに (すぐにではなく) StreamingContext を正常に閉じるように Spark に指示します。さらに、新しいバージョンを展開するときなど、意図的に動作を停止するメカニズムも必要です。ジョブのシャットダウンを示す HDFS ファイルがあるかどうかを単純にチェックすることで、このメカニズムの最初のバージョンを実装しました。ファイルが HDFS に表示されると、次のパラメータを使用してストリーミング コンテキストが停止されます: ssc.stop(stopSparkContext=true, stopGracefully=true)

この場合、Spark アプリケーションは受信したすべてのデータの処理が完了した後にのみ正常に停止します。まさにこれこそが私たちが必要としているものです。

ステップ3: Kafka commitAsync

これまでの内容を振り返ってみましょう。私たちは、すべての RDD 処理で Kafka オフセットを意図的にコミットし (Kafka commitAsync API を使用)、正常なシャットダウンのために Spark を使用します。どうやら、もう一つ注意点があるようです。 Kafka API のドキュメントと Kafka commitAsync() ソース コードを詳しく調べたところ、commitAsync() は offsetRanges をキューに入れるだけで、実際には次の foreachRDD ループでのみ処理されることがわかりました。 Spark ジョブが正常に停止し、すべての RDD の処理が完了したとしても、最後の RDD のオフセットは実際にはコミットされません。この問題に対処するために、Kafka オフセットを同期的に永続化し、Kafka commitAsync() に依存しないコードを実装しました。次に、RDD ごとに、コミットされたオフセットを HDFS ファイルに保存します。ジョブが再び実行を開始すると、HDFS からオフセット ファイルがロードされ、これらのオフセットから Kafka トピックの使用が開始されます。

さあ、うまくいきました!

望ましい結果が得られたのは、正常なシャットダウンと Kafka オフセットの同期ストレージの組み合わせだけでした。再起動中にデータの損失やデータの急増は発生しません。


図5. Sparkジョブの再起動中にピークデータ損失が発生しない

結論は

Spark Streaming と Kafka 間の統合問題を解決することは、リアルタイム分析ダッシュボードを構築する上で重要なマイルストーンです。 Spark Streaming ジョブの再起動中にイベントが失われたり重複したりすることなく、安定したデータ ストリームを確保するソリューションを見つけました。これで、Druid で視覚化できる信頼できるデータが得られました。そこで、Druid にさらに多くの種類のイベント (Kafka トピック) を追加し、リアルタイム ダッシュボードを構築しました。これらのダッシュボードは、BI、製品、顧客サポートなどのさまざまなチームに分析情報を提供します。私たちの次の目標は、新しい分析機能やアラートなど、Druid の機能をさらに活用することです。

<<:  マルチクラウド環境をより良く管理する方法

>>:  テンセントクラウドとキングディーが戦略的提携を結び、SaaSエコシステムを共同で構築

推薦する

企業がウェブサイトを構築する必要があるのはなぜですか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています企業がウェ...

ウェブサイトの最適化について:コンテンツと外部リンクのどちらがより重要ですか?

今日、A5 Webmaster Network で「2013 年の Web サイトの最適化: 外部リ...

レポート: JavaScript が最も人気のあるプログラミング言語に

業界分析会社 RedMonk は本日、最も人気のあるプログラミング言語の最新の四半期ランキングを発表...

CN2 GIA - 特別オファー、ベータ版、今後の「家宝」になるかもしれない

BandwagonHostはロサンゼルスデータセンターに新しいVPS、CN2 GIA超高レベル最適化...

新潮メディア厦門プロモーション会議が成功裏に開催され、南福建ゴールデントライアングルのメディアマーケティングの新たな未来をリードする

月収10万元の起業の夢を実現するミニプログラム起業支援プラン2018年9月28日、新潮メディアグルー...

外部プロモーションはウェブサイトに役立ちますか?

オフサイトプロモーションには、フレンドリーリンクの構築とリンク情報の公開が含まれます。この仕事は非常...

Virpus-Seattle Xen 仮想 VPS が 40% 割引で販売中です。512M のメモリが月額約 1.7 ドルです。

米国西海岸のコンピュータルームの VPS は、一般的に中国の中部、南部、東部、北部の VPS よりも...

コンテンツマーケティングの有効性評価戦略についての簡単な説明

コンテンツ戦略は、ブランドに対する人々の認知度を効果的に高め、人々を潜在顧客や信頼できるブランド支持...

2024年に注目すべきエッジコンピューティングの5つのトレンド

エッジ コンピューティングにより、レイテンシが短縮され、全体的なパフォーマンスが向上します。 Mar...

JVM メモリ管理について話す [非専門家]

[[399153]] JVM メモリレイアウト1 つのタイプは各スレッド専用です。 PC レジスタ:...

Kubernetes Ingress: クラスターへの外部ネットワークアクセスのための柔軟なツール

前提条件 すでに Kubernetes クラスターがあり、それにアクセスできます。 kubectl ...

検索を改善するには、ネットユーザーのニーズを把握する必要がある

この金融危機は、アメリカの大ヒット映画「デイ・アフター・トゥモロー」で描かれた世界的大惨事よりもさら...

製品のマーケティングにWeiboを使用するのは信頼できるでしょうか?

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス製品マーケティングにWe...

ECIS 2020 | 2020 エッジ コンピューティング インダストリー サミットが明日開幕します。重要なゲストや業界フォーラムをちょっと覗いてみましょう!

明日2020年エッジコンピューティング業界サミットが間もなく始まります8件以上の主要リリース、60件...