Kafka と Druid を使用した Spark ストリーミングの理解

Kafka と Druid を使用した Spark ストリーミングの理解

[[326057]]

データ エンジニアとして、Spark Streaming、Kafka、Apache Druid などのビッグ データ テクノロジーに取り組んでいます。それぞれに独自のチュートリアルと RTFM ページがあります。ただし、これらのテクノロジーを大規模に組み合わせる場合は、より複雑な本番環境のユースケースをカバーするソリューションが必要になるでしょう。このブログ記事では、Spark Streaming、Kafka、Apache Druid を組み合わせてリアルタイム分析ダッシュボードを構築し、正確なデータ表現を保証することで得られた知識を共有します。

始める前に…リアルタイム分析について少しお話します

リアルタイム分析はビッグデータ テクノロジーの新しいトレンドであり、多くの場合、ビジネスに大きな影響を与えます。最新のデータを分析すると、洞察はより正確になります。たとえば、データ アナリスト、BI、アカウント マネージャー チームにリアルタイムの分析ダッシュボードを提供すると、これらのチームは迅速な意思決定を行うことができます。大規模なリアルタイム分析のための一般的なアーキテクチャは、Spark Streaming と Kafka に基づいています。どちらのテクノロジーも非常にスケーラブルです。これらはクラスター上で実行され、多数のコンピューターに負荷を分散します。 Spark ジョブの出力は、特定のユースケースとアーキテクチャに応じて、さまざまな宛先に送信できます。私たちの目標は、リアルタイムのイベントを表示する視覚的なツールを提供することです。この目的のために、Apache Druid データベースを選択しました。

Apache Druid でのデータ可視化

Druid は、高性能なリアルタイム分析データベースです。その利点の 1 つは、Kafka トピックからリアルタイム データを取得し、その上に Pivot モジュールを使用して強力な視覚化を構築できることです。視覚化機能により、さまざまなアドホックな「スライス アンド ダイス」クエリを実行し、視覚的な結果をすばやく取得できます。これは、特定のスポーツが特定の国でどのように機能するかなど、さまざまなユースケースを分析するのに役立ちます。データは 1 ~ 2 分の遅延でリアルタイムに取得されます。

建築

そこで、Kafka イベントと Apache Druid をベースにしたリアルタイム分析システムを構築することにしました。 Kafka トピックではすでにアクティビティが行われています。しかし、それらを直接 Druid に取り込むことはできません。各イベントにさらに多くの次元を追加する必要があります。 Druid で便利に表示できるように、各イベントにさらに多くのデータを追加する必要があります。規模に関しては、1 分間に数十万件のイベントを処理するため、これらの数値をサポートできるテクノロジーを使用する必要があります。私たちは、Spark Streaming ジョブを使用して生の Kafka イベントを充実させることにしました。


図1. リアルタイム分析アーキテクチャ

Spark Streaming ジョブは永久に実行されますか?あまり。

Spark Streaming ジョブの考え方は、常に実行されることです。この作業は決して止まるべきではない。 Kafka トピックからイベントを継続的に読み取り、処理し、出力を別の Kafka トピックに書き込みます。しかし、これは楽観的な見方です。現実の世界では、物事はもっと複雑です。 Spark クラスターでドライバー障害が発生しています。その場合、ジョブは再開されます。場合によっては、Spark アプリケーションの新しいバージョンが本番環境にデプロイされることがあります。この場合はどうなるのでしょうか?再開されたジョブはどのようにして Kafka トピックを読み取り、イベントを処理するのでしょうか?詳細に入る前に、次の図は、Spark Streaming ジョブを再開したときに Druid に表示される内容を示しています。


図2. ジョブ再開時のデータ損失

間違いなくデータ損失です!

私たちはどんな問題を解決しようとしているのでしょうか?

私たちは、1 つの Kafka トピックからイベントを読み取り、別の Kafka トピックにイベントを書き込む Spark Streaming アプリケーションに取り組んでいます。これらのイベントは後で Druid で表示されます。私たちの目標は、Spark Streaming アプリケーションの再起動中にスムーズなデータ視覚化を実現することです。つまり、Spark Streaming ジョブの再起動中にイベントが失われたり重複したりしないようにする必要があります。

すべては補償の問題だ

ジョブを再開したときにデータが失われる理由を理解するには、Kafka アーキテクチャのいくつかの用語を理解する必要があります。 Kafka の公式ドキュメントはここにあります。簡単に言うと、Kafka のイベントはトピックに保存されます。各トピックはパーティションに分割されます。パーティション内の各レコードには、レコードの順序を定義する連番であるオフセットがあります。アプリケーションがこのトピックを使用すると、さまざまな方法でオフセットを処理できます。デフォルトの動作では、常に最新のオフセットから読み取ります。もう 1 つのオプションは、オフセットをコミットすることです。これにより、オフセットが保持され、ジョブが再起動されたときにコミットされたオフセットを読み取り、そこから続行できるようになります。ソリューションのステップを確認し、各ステップでの Kafka オフセット管理についてより深く理解しましょう。

ステップ1 - オフセットを自動的にコミットする

デフォルトの動作では、常に最新のオフセットから読み取ります。ジョブが再開されるとトピックに新しいイベントがあるため、これは機能しません。ジョブが Latest から読み取る場合、図 2 に示すように、再起動中に追加されたすべてのメッセージが失われます。Spark Streaming には「enable.auto.commit」パラメータがあります。デフォルトでは、その値は false です。図 3 は、値を true に変更し、Spark アプリケーションを実行して再起動した後の動作を示しています。


図3. ジョブ再開のデータピーク

Kafka の自動コミット機能を使用すると、新しい効果があることがわかります。 「データ損失」はありませんが、重複したイベントが発生しています。実際に「爆発的な」出来事は起きなかった。実際に起こるのは、自動コミット メカニズムがオフセットを「随時」コミットすることです。出力トピックにはコミットされていないメッセージが多数あります。再起動後、ジョブは最新のコミットされたオフセットからのメッセージを消費し、これらのイベントの一部を再度処理します。そのため、出力には多数のイベントが表示されます。

明らかに、これらの重複を視覚化に組み込むと、このデータのビジネス消費者に誤解を招き、意思決定やシステムへの信頼に影響を与える可能性があります。

ステップ2: Kafkaオフセットを手動でコミットする

したがって、Kafka の自動コミット機能に依存することはできません。 Kafka の補正は自分で行う必要があります。これを実行するには、Spark Streaming が Kafka トピックからデータをどのように消費するかを見てみましょう。 Spark Streaming は、離散ストリームまたは DStream と呼ばれるアーキテクチャを使用します。 DStream は、Spark の主な抽象化の 1 つである RDD (Resilient Distributed Datasets) の連続したシーケンスによって表されます。ほとんどの Spark Streaming ジョブは次のようになります。

  1. dstream.foreachRDD { rdd => rdd.foreach { レコード => プロセス(レコード) } }

この場合、レコードを処理するということは、レコードを出力 Kafka トピックに書き込むことを意味します。したがって、Kafka オフセットをコミットするには、次の操作を行う必要があります。

  1. dstream.foreachRDD { rdd => val offsetRanges =
  2. rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreach { レコード
  3. => プロセス(記録)}
  4. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }

これは単純なアプローチですが、その前に全体像を見てみましょう。オフセットを正しく処理すると仮定します。つまり、オフセットは各 RDD 処理後に保存されます。ジョブを停止するとどうなりますか?ジョブは RDD の処理の途中で停止します。マイクロバッチの処理された部分は出力 Kafka トピックに書き込まれますが、コミットされません。ジョブが再度実行されると、一部のメッセージが再度処理され、重複メッセージの急増が Druid に表示されます (以前と同様)。


図4. ジョブ再開時のデータスパイク

通常のシャットダウン

RDD 処理中にジョブが強制終了されないようにする方法があることがわかりました。これは「正常なシャットダウン」と呼ばれます。 Spark アプリケーションを正常に終了する方法を説明するブログ投稿がいくつかありますが、そのほとんどは古いバージョンの Spark に関連しており、多くの制限があります。私たちは、あらゆる規模で機能し、特定の Spark バージョンや OS に縛られない「安全な」ソリューションを探していました。正常なシャットダウンを有効にするには、次のパラメータを使用して Spark コンテキストを作成する必要があります: spark.streaming.stopGracefullyOnShutdown=true。これは、JVM がシャットダウンしたときに (すぐにではなく) StreamingContext を正常に閉じるように Spark に指示します。さらに、新しいバージョンを展開するときなど、意図的に動作を停止するメカニズムも必要です。ジョブのシャットダウンを示す HDFS ファイルがあるかどうかを単純にチェックすることで、このメカニズムの最初のバージョンを実装しました。ファイルが HDFS に表示されると、次のパラメータを使用してストリーミング コンテキストが停止されます: ssc.stop(stopSparkContext=true, stopGracefully=true)

この場合、Spark アプリケーションは受信したすべてのデータの処理が完了した後にのみ正常に停止します。まさにこれこそが私たちが必要としているものです。

ステップ3: Kafka commitAsync

これまでの内容を振り返ってみましょう。私たちは、すべての RDD 処理で Kafka オフセットを意図的にコミットし (Kafka commitAsync API を使用)、正常なシャットダウンのために Spark を使用します。どうやら、もう一つ注意点があるようです。 Kafka API のドキュメントと Kafka commitAsync() ソース コードを詳しく調べたところ、commitAsync() は offsetRanges をキューに入れるだけで、実際には次の foreachRDD ループでのみ処理されることがわかりました。 Spark ジョブが正常に停止し、すべての RDD の処理が完了したとしても、最後の RDD のオフセットは実際にはコミットされません。この問題に対処するために、Kafka オフセットを同期的に永続化し、Kafka commitAsync() に依存しないコードを実装しました。次に、RDD ごとに、コミットされたオフセットを HDFS ファイルに保存します。ジョブが再び実行を開始すると、HDFS からオフセット ファイルがロードされ、これらのオフセットから Kafka トピックの使用が開始されます。

さあ、うまくいきました!

望ましい結果が得られたのは、正常なシャットダウンと Kafka オフセットの同期ストレージの組み合わせだけでした。再起動中にデータの損失やデータの急増は発生しません。


図5. Sparkジョブの再起動中にピークデータ損失が発生しない

結論は

Spark Streaming と Kafka 間の統合問題を解決することは、リアルタイム分析ダッシュボードを構築する上で重要なマイルストーンです。 Spark Streaming ジョブの再起動中にイベントが失われたり重複したりすることなく、安定したデータ ストリームを確保するソリューションを見つけました。これで、Druid で視覚化できる信頼できるデータが得られました。そこで、Druid にさらに多くの種類のイベント (Kafka トピック) を追加し、リアルタイム ダッシュボードを構築しました。これらのダッシュボードは、BI、製品、顧客サポートなどのさまざまなチームに分析情報を提供します。私たちの次の目標は、新しい分析機能やアラートなど、Druid の機能をさらに活用することです。

<<:  マルチクラウド環境をより良く管理する方法

>>:  テンセントクラウドとキングディーが戦略的提携を結び、SaaSエコシステムを共同で構築

推薦する

ultravps: 大容量ハードディスク VPS (ZFS)、KVM、2T ハードディスク、60 ユーロ/半年

ultravpsは、ドイツのデータセンターがストレージVPS(大容量ハードドライブVPS)の販売を開...

ウェブマスターネットワークニュース:タオバオの売り手は保険なしで「第3四半期戦争」に別れを告げる。二審は最初の判決を支持するかもしれない

1. 初の電子商取引保険プラットフォームが立ち上げられ、タオバオの販売業者は保険なしで生活できるよう...

海外のモデルは信頼性が低く、国内のクラウドファンディングは変革を迫られる

文/捜狐IT国人中国最大のクラウドファンディングプラットフォームの構築には3年かかりました。現在、D...

データの活用: マルチクラウド統合の課題への対応

ほとんどの企業は、計画に含まれていたかどうかにかかわらず、マルチクラウド環境を管理していることになり...

veesp: 月額 1 ドル、ロシアの VPS、KVM 仮想化、512 RAM/10G ハードディスク、PayPal 対応

2004年に設立されたFishnet Сommunicationsは、RNET、vStoike、Ve...

簡潔な分析:百度の物流ウェブサイトランキング調整

2013年9月10日は教師の日です。午前中、私は習慣的にウェブサイトのランキングをチェックしていまし...

初心者は、ウェブサイトはトレードオフを行うプロセスであることを理解する必要があります。

最近、私のウェブマスターQQグループがなぜか静かになっているのか分かりません。百度の耐え難いKステー...

ブランドパブリックドメイン+プライベートドメインを組み合わせたマーケティング

成長はブランド商人の永遠の焦点です。トラフィックがどこから来るのか、それをどのように管理するのかは、...

register.com ドメイン名を8.99ドルで登録

Register からドメイン名のプロモーションがあるというメールが届きました。確認してみると、全体...

Windows EFSを使用したファイル暗号化

Windows BitLocker と同様に、暗号化ファイル システム (EFS) は Window...

A5 Yuehuaiマーケティング:人生を大切にし、病的なSEOから遠ざかる

「あなた(Baidu)を喜ばせるために、私は休み時間を犠牲にして、毎日午前3時まで手動で外部リンクを...

「3つのノー」のウェブサイトを最適化するにはどうすればいいでしょうか?

「3無」ウェブサイトとは、Baiduの重みがなく、GoogleのPR価値がなく、安定したトラフィック...

1 つの記事で Yandex SEO を理解する: ロシアの対外貿易はここから始まります

画像出典: Tuchong Creative Yandex はロシアで最も広く使用されている検索エン...

これらの8つのステップをうまく実行すれば、あなたは最初の

タイトルからおわかりのように、今日私が皆さんにお伝えしたいのは、いかに早くランク付けし、素早く SE...