Apache Kafka と Spark Streaming を統合する 2 つの方法とその長所と短所

Apache Kafka と Spark Streaming を統合する 2 つの方法とその長所と短所

[51CTO.com クイック翻訳] Kafka と Spark Streaming の統合

Apache Kafka と Spark Streaming を統合する実際のプロセスでは、通常、Spark Streaming を構成して Kafka からデータを受信する 2 つの方法を選択できます。最初のものはレシーバーと Kafka の高レベル API を使用します。 2 番目の新しい方法では受信機を使用しません。これら 2 つのアプローチは、パフォーマンス特性とセマンティクスの保存に関して異なるプログラミング モデルを持っています。

これら 2 つのアプローチを詳しく見てみましょう。

1. 受信者ベースのアプローチ

このメソッドは、受信機を使用してデータを受信します。レシーバーは、Kafka の高レベル コンシューマー API を使用して実装されます。また、受信したデータはSparkの各エグゼキュータに保存されます。その後、データは Spark Streaming によって起動されたジョブによって処理されます。

ただし、このアプローチのデフォルト構成では、障害が発生した場合にデータが失われる可能性があります。したがって、データ損失をゼロにするために、Spark Streaming で先行書き込みログを追加で有効にする必要があります。受信したすべての Kafka データを分散ファイル システムの書き込み前ログに同期的に保存し、障害発生時にすべてのデータを回復できるようにします。

次に、Kafka-Spark ストリーミング アプリケーションでこのレシーバーベースのアプローチを使用する方法について説明します。

1. リンク

次に、Kafka ストリーミング アプリケーションを次の成果物にリンクします。 Scala および Java アプリケーションの場合、SBT (Simple Build Tool) および Maven (ビルド ツール) のさまざまなプロジェクト定義を使用します。

  1. グループID = org.apache.spark
  2. アーティファクト ID = spark-streaming-kafka-0-8_2.11
  3. バージョン = 2.2.0

Python アプリケーションの場合、独自のアプリケーションをデプロイするときに、上記のライブラリとそのさまざまな依存関係を追加する必要があります。

2. プログラミング

次に、ストリーミング アプリケーション コードで、KafkaUtils をインポートして DStream 入力を作成します。

  1. org.apache.spark.streaming.kafka._ をインポートします。
  2. val kafkaStream = KafkaUtils.createStream(ストリーミングコンテキスト、
  3. [ZK クォーラム]、[コンシューマーグループID]、[トピックごとに消費するKafka パーティション])

同様に、createStream のさまざまなバリエーションを使用することで、さまざまなキー/値クラスとそれに対応するデコード クラスを開発できます。

3. 展開

通常、どの Spark アプリケーションでも、spark-submit を使用してアプリケーションを公開できます。もちろん、Scala、Java、Python の各アプリケーションごとに詳細は若干異なります。

これらのうち、Python アプリケーションには SBT および Maven プロジェクト管理がないため、–packages spark-streaming-kafka-0-8_2.11 とその依存関係を使用して、それらを spark-submit に直接追加できます。

  1. ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...  

さらに、Maven リポジトリから Maven アーティファクト spark-streaming-Kafka-0-8-assembly に対応する JAR パッケージをダウンロードし、-jars を使用して spark-submit に追加することもできます。

2. 直接方式(受信機なし)

受信機ベースの方法に続いて、受信機を必要としない「直接」方式という新しいクラスが誕生しました。このアプローチにより、エンドツーエンドの保証が強化されます。レシーバーを使用してデータを受信する代わりに、各トピック + パーティションの最新のオフセットを定期的に Kafka に照会します。また、各バッチで処理されるさまざまなオフセット範囲も定義します。特に、データを処理するジョブが開始されると、シンプルなコンシューマー API を使用して、Kafka で事前定義されたオフセット範囲が読み取られます。このプロセスは、ファイル システムからさまざまなファイルを読み取るプロセスに似ていることがわかります。

注: Spark は、Scala および Java API のバージョン 1.3 と Python API のバージョン 1.4 でこの機能を導入しました。

以下では、ストリーミング アプリケーションでこのアプローチを使用する方法について説明し、コンシューマー API の詳細を確認するためのリンクを提供します。

1. リンク

もちろん、このアプローチは Scala および Java アプリケーションでのみサポートされており、STB プロジェクトと Maven プロジェクトをリンクするには次の成果物が使用されます。

  1. グループID = org.apache.spark
  2. アーティファクト ID = spark-streaming-kafka-0-8_2.11
  3. バージョン = 2.2.0

2. プログラミング

次に、ストリーミング アプリケーション コードで、KafkaUtils をインポートして DStream 入力を作成します。

  1. org.apache.spark.streaming.kafka._ をインポートします。
  2. val directKafkaStream = KafkaUtils.createDirectStream[
  3. [キークラス]、[ 値クラス]、[キーデコーダークラス]、[ 値デコーダークラス] ](
  4. streamingContext、[ Kafkaパラメータマップ]、[設定 消費するトピックの数])

デフォルトで各 Kafka パーティションの最新のオフセットから消費を開始できるように、Kafka のパラメータで metadata.broker.list または bootstrap.servers を指定する必要があります。もちろん、Kafka のパラメータで auto.offset.reset を最小に設定すると、最小のオフセットから消費が開始されます。

さらに、KafkaUtils.createDirectStream のさまざまなバリエーションを使用することで、任意のオフセットから消費を開始できます。もちろん、次のように各バッチで Kafka オフセットを消費することもできます。

  1. //現在オフセット範囲への参照を保持し、下流で使用できるようにします
  2. var offsetRanges = Array.empty[オフセット範囲]
  3. 直接KafkaStream.transform { rdd =>
  4. オフセット範囲 = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  5. rdd
  6. }.map {
  7. ...
  8. }.foreachRDD { rdd =>
  9. (o <- offsetRanges)の場合{
  10. println(s "${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}" )
  11. }
  12. ...
  13. }

Zookeeper ベースの Kafka 監視ツール (https://data-flair.training/blogs/zookeeper-in-kafka/) を使用してストリーミング アプリケーションの進行状況を表示する場合は、自分で Zookeeper に更新することもできます。

3. 展開

この側面での展開プロセスは、レシーバーベースのアプローチに似ているため、ここでは説明しません。

直接法の利点

Spark Streaming と Kafka の統合の観点から見ると、2 番目の方法は 1 番目の方法に比べて次の利点があります。

1. 並列処理を簡素化する

複数の入力 Kafka ストリームを作成してマージする必要はありません (https://data-flair.training/blogs/kafka-streams/)。ただし、Spark Streaming は、直接的な方法を使用して複数の Kafka パーティションで使用するために、同じ数の RDD (Resilient Distributed Datasets) パーティションを作成します。これらのパーティションは、Kafka からのデータも並行して読み取ります。したがって、Kafka と RDD パーティションの間には 1 対 1 のマッピングがあり、理解しやすく調整しやすいと言えます。

2. 効率性

データ損失をゼロにするには、最初の方法では、さらなるレプリケーションのために先行書き込みログにデータを保存する必要があります。この方法は、実際にはデータが Kafka と先行書き込みログによって 2 回コピーされるため、効率が低くなります。直接方式では、受信者が存在しないため、事前にログを書き込む必要がないため、この問題は解決されます。十分な Kafka データ保持があれば、さまざまなメッセージを Kafka から回復できます。

3. 正確なセマンティクス

最初のアプローチでは、Kafka の高レベル API を使用して、消費されたオフセットを Zookeeper に保存します。ただし、Kafka からデータを使用するこの従来の方法では、データ損失がゼロになることを保証できますが、障害状況によっては、低い確率でデータが 2 回使用される可能性があります。実際には、この状況は、Spark Streaming によって確実に受信されるデータと Zookeeper によって追跡されるオフセットとの間の不一致から発生します。したがって、2 番目のアプローチでは、Zookeeper は使用せず、単純な Kafka API のみを使用します。 Spark Streaming はチェックポイントを通じてさまざまなオフセットを追跡し、それによって Spark Streaming と Zookeeper 間の不整合を排除します。

障害が発生した場合でも、それらのレコードは Spark Streaming によって一度に効率的かつ正確に受信されることがわかります。これにより、出力操作の冪等性とアトミック性が保証されます。つまり、データを外部データ リポジトリに保存するときに結果とオフセットが保存され、正確なセマンティクスの実現にも役立ちます。

ただし、このアプローチには欠点もあります。Zookeeper 内のさまざまなオフセットが更新されないため、Zookeeper に基づく Kafka 監視ツールは進行状況を表示できません。もちろん、このメソッドによって処理されたオフセットに各バッチでアクセスし、Zookeeper で更新することもできます。

結論は

上記の説明を通じて、Kafka と Spark Streaming を統合する全体的な概念を理解しました。また、Kafka-Spark Streaming の 2 つの異なる構成方法 (レシーバー方式とダイレクト方式) と、ダイレクト方式のいくつかの利点についても説明しました。

原題: Apache Kafka + Spark Streaming Integration、著者: Rinu Gour

[51CTOによる翻訳。パートナーサイトに転載する場合は、元の翻訳者と出典を51CTO.comとして明記してください。

<<:  オラクルは人事および人材管理プロセスを再構築し、人間味のある労働モデルを構築

>>:  9枚の写真でBATのクラウド戦略を分析し、この巨人が「クラウド戦場」をどうリードしているかを見る

推薦する

#香港サーバ# RFC:$109/E3-1240L v2/8gメモリ/1Tハードディスク/3ネットワーク直接接続

こちらが香港にある RFC の独立サーバーです: 香港SunnyVisionデータセンターは、中国本...

再開中です! Baidu VR製品+テクノロジーの二重サポートで企業のオンラインマーケティングのアップグレードを促進

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス新型コロナウイルス肺炎の...

ウェブサイトの実用性の開発動向に関する深い議論

インターネットはほとんどの人に知られているので、オンラインになったときに最初に接触するのはウェブサイ...

cloudshards-ロサンゼルス/ストレージVPS/Gポート

cludShards は、openvz をベースとしたストレージ VPS のプロモーションを再度開始...

foxcloud: 月額 10 ドル、OpenStack クラウド VPS、ロシア\オランダ\米国

foxcloudは2009年に設立されたブランドです。ドメイン名、仮想ホスト、VPS独立サーバー、I...

IoT、エッジコンピューティング、AIプロジェクトが企業にもたらす利益

[[385209]]ビル・ホームズは、象徴的なフェンダー・ストラトキャスターとテレキャスターのギター...

調査により、パブリッククラウド管理ツールが不足していることが判明

アプリケーションをパブリック クラウドに移行することをお考えですか?もしそうなら、それらの資産を追跡...

広州交易会は初めてライブストリーミングを採用し、Dianshi Technologyは商店のライブストリーミングを支援した。

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス商務省は4月10日、国務...

tripodcloud: 1Gbpsの高帯域幅、3つのネットワークへの直接接続+CN2 GIA、大容量SSDハードディスク

Tripodcloud のダブル 11 イベントを見逃した人も多いかもしれませんが、心配しないでくだ...

Alibaba CloudとCAICTが共同でクラウドコンピューティング業界初のデジタル安全生産標準を発表

7月28日、デジタルビジネスのセキュアな生産に焦点を当てた初の国内標準「クラウドコンピューティングベ...

zjiの3回線サーバーの簡単なレビュー、香港アリババクラウドの専用サーバーのレビュー

「Zji Hong Kong物理マシン:1台のマシンに3つの回線、Alibaba専用回線+Huawe...

検索結果のローカライズの違いについての簡単な分析

石林さんが百度で「省通信局」を検索したとき、もともとは「江蘇省通信局」を検索するつもりだったが、地元...

Sentry のオープンソース版と商用 SaaS 版の違いをご存知ですか?

この記事はWeChatの公開アカウント「Hacker Afternoon Tea」から転載したもので...

Alibaba Cloud 江江衛:Yitian + Feitian + CIPU の組み合わせはパフォーマンスが 20% 以上向上し、良好なパフォーマンスを発揮

11月3日、2022年雲奇大会において、アリババクラウドインテリジェンス副社長兼基本製品責任者の江江...