みなさんこんにちは。私はNezhaです。 Kafka は、今日の時代におけるデータ パイプラインのほぼ第一選択肢です。バックエンド開発やビッグデータ開発を行っている方なら、ご存知かもしれません。オープンソースソフトウェア Kafka の応用はますます広まっています。 Kafka の人気と学習ブームを受けて、Nezha は長年の開発経験を共有し、読者が Kafka の関連知識をより簡単に習得できるようにしたいと考えています。 1. Kafka 統合モードを理解する1. Kafka とは何ですか? Apache Kafka は、LinkedIn によって開発された、高スループット、分散型、水平スケーラブルなメッセージング システムです。その目標は、リアルタイムのストリーミング処理と大量データの送信の問題を解決することです。 Kafka の中心的なアイデアは、データをストリームに変換し、パブリッシュ/サブスクライブ方式で送信することです。 上の図は、Kafka のコアコンセプトとデータフローを説明しています。ご覧のとおり、プロデューサーはトピックにメッセージを公開し、コンシューマーはトピックをサブスクライブしてメッセージを処理し、トピックを複数のパーティションに分割してメッセージの並列処理をサポートし、スケーラビリティを向上させることができます。 2. Kafka の主要な概念は次のとおりです。- トピック: トピックはメッセージのカテゴリであり、メッセージ キューの名前と考えることができます。データは主題ごとに分類され、整理されました。複数のプロデューサーが同じトピックにメッセージを公開でき、複数のコンシューマーがトピックをサブスクライブしてその中のメッセージを処理できます。
- プロデューサー: プロデューサーはデータの送信者であり、1 つ以上のトピックにメッセージを公開する役割を担います。トピックにメッセージを追加し、メッセージのキーを指定して、より適切なパーティショニングとルーティングを行うことができます。
- 消費者: 消費者はデータの受信者です。 1 つ以上のトピックをサブスクライブして、これらのトピックに公開されたメッセージを取得します。コンシューマーは異なるコンシューマー グループで動作できるため、複数のコンシューマーがメッセージを並行して処理できます。
- パーティション: 各トピックを 1 つ以上のパーティションに分割して、メッセージの並列処理をサポートし、スケーラビリティを向上させることができます。パーティショニングにより、メッセージを異なるコンシューマー間で分散することができ、各メッセージは特定のコンシューマー グループ内の 1 つのコンシューマーによってのみ処理されます。
2. バッチ処理とストリーム処理が必要なのはなぜですか?バッチ処理とストリーム処理は、さまざまなアプリケーション シナリオで重要な役割を果たしている Kafka の 2 つのコア処理モードです。アプリケーションのコンテキストと違いを理解することで、Kafka の可能性をより有効に活用できるようになります。 バッチ処理は、データをバッチで収集して処理するモードです。レポート生成、オフラインデータ分析、バッチ ETL (抽出、変換、ロード) など、大量の履歴データの処理を必要とするタスクに適しています。 バッチ処理は通常、一定の間隔で実行され、大量のデータを処理して結果を生成します。以下の機能があります: - 高いスループット: バッチ ジョブはリソースを最大限に活用してスループットを最大化できます。
- オフライン処理: バッチ処理は通常、オフライン データに使用され、リアルタイム処理は必要ありません。
- 複雑な計算: バッチ処理では、データ セット全体を処理するため、複雑な計算と分析をサポートできます。
ストリーム処理は、受信データを継続的に処理できるリアルタイム データ処理モデルです。リアルタイム監視、リアルタイム推奨、不正検出など、リアルタイム応答を必要とするアプリケーションに適しています。ストリーム処理により、データがすぐに利用可能になり、次の特性があります。 - 低レイテンシ: ストリーム処理では通常、ミリ秒単位のレイテンシでデータが処理されるため、アプリケーションは迅速に意思決定を行うことができます。
- リアルタイム処理: ストリーム処理は、リアルタイムで生成されたデータを処理するために使用され、データの鮮度に対する要件が高くなります。
- 有限状態: ストリーム処理では、常に変化するデータ ストリームを処理する必要があるため、通常は有限状態のデータを処理します。
Kafka の強みを最大限に活用するには、両方のモードを理解して使用し、特定のニーズに基づいてバッチ処理とストリーム処理を切り替える必要があります。たとえば、ほとんどの実用的なアプリケーションでは、データはストリームの形式で Kafka に入り、ストリーム処理ツールによってリアルタイムで処理されます。同時に、履歴データをバッチタスクとして定期的に処理することもできます。 3. Kafkaトピックパーティション戦略1. デフォルトのパーティション分割戦略Kafka のデフォルトのパーティション分割戦略はラウンドロビンです。つまり、メッセージは順番に各パーティションに割り当てられ、各パーティションが同様の数のメッセージを受信するようになります。このデフォルトの戦略は、同様のデータ量と処理要件を持つパーティションに適しています。この戦略では、Kafka は負荷分散を維持するために各パーティションにメッセージを順番に書き込みます。一般的な使用例では、通常、このデフォルト ポリシーで十分です。 2. カスタムパーティション戦略デフォルトのパーティション分割戦略はほとんどの場合に適していますが、より柔軟なパーティション分割戦略が必要な場合もあります。現時点では、カスタム パーティション分割戦略を使用して、特定のニーズに基づいてメッセージを異なるパーティションにルーティングできます。最も一般的なケースは、メッセージの順序を維持するために、同じキーを持つメッセージが同じパーティションに書き込まれるようにすることです。 カスタム パーティション分割戦略のサンプル コードは次のとおりです。 public class CustomPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // 根据消息的键来选择分区int partition = Math.abs(key.hashCode()) % numPartitions; return partition; } @Override public void close() { // 关闭资源} @Override public void configure(Map<String, ?> configs) { // 配置信息} } カスタム パーティション戦略により、メッセージのルーティング方法をより柔軟に制御できるようになります。上記の例では、メッセージのキーに基づいてパーティションが選択され、同じキーを持つメッセージが同じパーティションに書き込まれ、順序が維持されます。 3. ベストプラクティス: パーティション分割戦略の選択方法パーティション分割戦略の選択は、特定のニーズとアプリケーション シナリオに基づいて行う必要があります。ベストプラクティスの推奨事項をいくつか示します。 - デフォルトの戦略: アプリケーション シナリオで特定のパーティション制御が必要ない場合は、デフォルトの
Round-Robin パーティション戦略を使用するのが通常、最も簡単で効果的な方法です。
- カスタム戦略: メッセージがキー順に保存されるようにする必要がある場合、またはその他の特定の要件がある場合は、カスタム パーティション分割戦略の使用を検討してください。カスタム パーティション戦略により柔軟性が向上します。
- テストと評価: パーティション分割戦略を選択する前に、テストと評価を行うことをお勧めします。現実的な負荷をシミュレートし、さまざまな戦略のパフォーマンスを測定して、アプリケーションに最適な戦略を見つけることができます。
適切なパーティショニング戦略を選択すると、Kafka のパフォーマンスとメッセージ処理を最適化し、アプリケーションが可能な限り最適な方法でメッセージを処理できるようになります。 4. バッチ処理とストリーム処理の概要1. バッチ処理の概念バッチ処理とは、一定の時間間隔または一定のデータ量でデータを収集、処理、分析するデータ処理方法です。バッチ処理は、データレポートの生成、大規模なデータのクリーニング、オフライン データ分析など、リアルタイムの応答を必要としないタスクに適しています。 バッチ処理では、通常、データは集中管理された場所に保存され、定期的にバッチ処理されます。この処理サイクルは、ビジネス ニーズに基づいて、毎日、毎週、またはその他の時間間隔にすることができます。バッチ処理ではデータセット全体を処理する必要があるため、処理中に多くのリソースが消費されます。 2. ストリーム処理の概念ストリーム処理は、受信データを継続的に処理できるリアルタイムのデータ処理方法です。ストリーム処理は、リアルタイム監視、リアルタイム推奨、不正検出など、リアルタイム応答を必要とするアプリケーションに適しています。 ストリーム処理では、バッチが蓄積されるのを待たずにデータがすぐに処理されます。これにより、ストリーム処理はリアルタイム アプリケーションの要件を満たす低遅延のデータ処理を提供できるようになります。ストリーム処理は、イベント ストリームを処理したり、リアルタイムのパフォーマンスを必要とするセンサー データやその他のデータ ソースを監視したりするためによく使用されます。 3. バッチ処理とストリーム処理の違いバッチ処理とストリーム処理の違いは次のとおりです。 - 適時性: バッチ処理は定期的ですが、ストリーム処理はリアルタイムです。
- リソース要件: バッチ処理には通常多くのリソースが必要ですが、ストリーム処理にはリアルタイムのリソースが必要です。
- アプリケーションシナリオ: バッチ処理はオフラインデータ処理に適しており、ストリーム処理はリアルタイムアプリケーションに適しています。
- データ処理方法: バッチ処理はデータセットを単位として処理し、ストリーム処理はデータストリームを単位として処理します。
Kafka の利点を最大限に活用するには、両方の処理モードを理解して使用し、特定のニーズに基づいてバッチ処理とストリーム処理を切り替える必要があります。これにより、アプリはさまざまな種類のデータを可能な限り最適な方法で処理できるようになります。 5. Kafka でのバッチ処理1. バッチ処理アプリケーションシナリオバッチ処理は、多くのアプリケーション シナリオ、特に大量の履歴データの処理を必要とするタスクで重要な役割を果たします。バッチ処理シナリオの例をいくつか示します。 アプリケーションシナリオ
| 説明する
| レポート生成
| 売上レポート、財務レポート、運用分析など、さまざまな種類のレポートを日次、週次、月次ベースで生成します。
| オフラインデータ分析
| 履歴データの詳細な分析を実施して、傾向、パターン、異常を明らかにします。
| データウェアハウスの人口
| さまざまなデータ ソースからデータを抽出、変換し、データ ウェアハウスにロードして、クエリと分析を行います。
| 大規模ETL
| あるシステムから別のシステムにデータを転送するには、通常、データのクレンジングと変換が必要です。
| バッチ画像処理
| サムネイルの生成、フィルターの処理など、大量の画像データを処理します。
|
2. バッチ処理アーキテクチャ一般的なバッチ処理アーキテクチャは、次のコンポーネントで構成されます。 コンポーネント
| 説明する
| データソース
| データ処理タスクのデータ ソースとしては、ファイル システム、データベース、Kafka などが考えられます。
| データ処理
| バッチ処理タスクの中核部分には、データの抽出、変換、ロード (ETL) に加え、必要な計算と分析が含まれます。
| データストレージ
| バッチ処理タスクでは、中間データや処理結果の保存場所は通常、リレーショナル データベース、NoSQL データベース、分散ファイル システムなどになります。
| 結果
| バッチ処理タスクの出力には通常、レポートの生成、データ ウェアハウスへの入力などが含まれます。
|
3. バッチ処理の重要な戦略(1)データバッファリングバッチ処理では、大量のデータを処理するときに、パフォーマンスを向上させ、データを効率的に管理するために、データ バッファリングを考慮する必要があります。 - メモリ バッファリング: メモリ バッファリングは、コンピューターのメモリにデータを保存するための戦略です。これにより、データへのアクセスが高速化され、特に中間計算結果に役立ちます。メモリ キャッシュを使用すると、ディスクの読み取りと書き込みの頻度が減り、パフォーマンスが大幅に向上します。ただし、メモリには限りがあるため、メモリ リソースを使い果たさないように注意して使用する必要があります。
- ディスク バッファリング: ディスク バッファリングは、データをディスクに保存する処理で、通常はデータ セット全体を保持するのにメモリが不足している場合に使用されます。読み取りおよび書き込み速度を犠牲にしてメモリ使用量を削減します。ディスク バッファリングは、データがメモリの容量を超えないようにするために、大規模なデータ セットを処理するときによく使用されます。
- データ分割: データ分割は、並列処理と分散処理のために大きなタスクを小さなタスクに分割する戦略です。それぞれの小さなタスクを独立して処理できるため、単一のタスクのリソース要件が削減され、全体的なパフォーマンスが向上します。これはタスクの並列化と組み合わされてコンピューティング クラスターのパフォーマンスを最大限に活用するものであり、大規模データを処理するための一般的なアプローチです。
(2)国家運営状態管理は、タスクの信頼性の高い実行、回復、フォールト トレランスを保証するのに役立つため、バッチ処理にとって重要です。 - タスク ステータス: タスクが失敗した後に回復できるように、各タスクのステータスを記録します。タスクのステータスには、タスクの進行状況、処理中のデータ、その他の重要な情報が含まれます。
- チェックポイント: タスクの中間状態を保存するために、定期的にチェックポイントを作成します。チェックポイントは、タスクが失敗した後にタスクのコンテキストを復元するために使用できるタスクの状態のスナップショットです。これにより、タスクのフォールト トレランスが確保されます。
- 調整サービス: Apache ZooKeeper などの分散調整サービスを使用して、タスクの実行を調整し、タスクが一貫して動作するようにします。調整サービスは、リーダー選出や分散ロックなどのタスクにも使用できます。
(3)エラー処理エラー処理は、タスクの信頼性とデータの品質を確保するためのバッチ処理の重要な部分です。 - 再試行: タスクが失敗した場合、再試行戦略を実装することで、タスクが最終的に正常に実行されることが保証されます。再試行では、不要な負荷を軽減するために、指数バックオフ再試行などのさまざまな戦略を採用できます。
- ログ記録: エラーや例外を含むタスク実行の詳細なログ。これはトラブルシューティングと監視に役立ちます。ログ記録は、監査やコンプライアンスの目的にとっても非常に重要です。
- アラート: オペレータがエラーに対処するための対策を講じることができるように、オペレータにタイムリーに通知するアラート メカニズムを確立します。アラートは電子メール、SMS 経由で送信したり、監視システムに統合したりできます。
バッチ処理でこれらの戦略を組み合わせて使用することで、タスクが信頼性が高く、効率的で、フォールト トレラントな方法で実行され、パフォーマンスと品質の要件を満たすことが保証されます。特定のアプリケーション シナリオに応じて、これらの戦略は必要に応じて調整できます。 4. 例: Kafka によるバッチ処理以下は、バッチ処理に Kafka を使用する方法を示す簡単な例です。 public class KafkaBatchProcessor { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "batch-processing-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("batch-data-topic")); // 批处理逻辑while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息processRecord(record.value()); } } } private static void processRecord(String record) { // 实现批处理逻辑System.out.println("Processing record: " + record); } } この例では、Kafka コンシューマーを作成し、batch-data-topic というメッセージ トピックをサブスクライブします。コンシューマーは定期的にメッセージをプルし、processRecord メソッドを呼び出して各メッセージを処理します。 この例では、バッチ処理タスクのデータ ソースとして Kafka を使用する方法を示していますが、実際のデータ処理ロジックは、アプリケーションのニーズに応じて、より複雑になる可能性があります。バッチ処理タスクには通常、データの抽出、変換、処理、結果の生成などのステップが含まれます。 6. Kafka でのストリーム処理1. ストリーム処理のアプリケーションシナリオストリーム処理は、データが継続的にシステムに流入し、すぐに処理する必要がある、リアルタイム応答を必要とするアプリケーション シナリオに適しています。ストリーム処理アプリケーション シナリオの例をいくつか示します。 - リアルタイム監視:センサーデータやサーバーログなどをリアルタイムで監視し、問題を迅速に検出して対処します。
- リアルタイム推奨: 製品の推奨、ニュースの推奨など、ユーザーの行動や興味に基づいて、パーソナライズされた推奨をリアルタイムで生成します。
- リアルタイム データ分析: ストリーミング データをリアルタイムで分析して、傾向、パターン、異常を検出します。これは、金融分野での不正検出、広告クリック分析などに使用できます。
- イベント処理: ソーシャル メディア メッセージ、IoT デバイス イベントなどの大規模なイベント ストリームを処理します。
ストリーム処理アプリケーションでは通常、データの適時性と品質を確保するために、低レイテンシ、高スループット、高スケーラビリティの要件を満たす必要があります。 2. ストリーム処理アーキテクチャストリーム処理アーキテクチャには通常、次の主要コンポーネントが含まれます。 - データ ソース: ストリーム処理アプリケーションがデータを受信する場所です。データ ソースには、Kafka トピック、メッセージ キュー、センサー、外部 API などがあります。
- ストリーム処理エンジン: ストリーム処理エンジンは、データ ストリームの処理、計算の実行、および結果の生成を担当するコア コンポーネントです。通常、Kafka Streams、Apache Flink、Apache Kafka などのストリーム処理フレームワークを使用します。
- データ ストレージ: ストリーム処理中に、後続のクエリと分析のために、処理結果または中間データを永続ストレージに保存する必要がある場合があります。これは、データベース、分散ストレージ システムなどです。
- 結果の生成: ストリーム処理アプリケーションは通常、リアルタイム ダッシュボード、通知、アラームなどの処理結果を生成します。
Kafka は、ストリーム処理アーキテクチャのデータ ソースおよびデータ ストレージとしてよく使用され、ストリーム処理フレームワークはデータ ストリームの処理に使用されます。これらのコンポーネントは連携して動作し、ストリーム処理アプリケーションがリアルタイムでデータに応答して分析できるようにします。 3. ストリーム処理の主要戦略(1)イベント時間処理イベント時間処理は、特にタイムスタンプ付きのイベント データを処理する場合に、ストリーム処理にとって重要な戦略です。イベント時間は、データがシステムに到着した時間ではなく、イベントが発生した実際の時間を表します。ストリーム処理アプリケーションでは、データの順序を保証するためにイベント時間を正しく処理する必要があります。これには、データの一貫性を維持するために、順序が正しくないイベント、遅延したイベント、重複したイベントなどの処理が含まれます。 (2)ウィンドウ操作ウィンドウ処理はストリーム処理の中心的な概念であり、これによりデータを異なる時間ウィンドウに分割して集計および分析できるようになります。一般的なウィンドウ タイプには、タンブリング ウィンドウ (時間の経過とともにロールフォワードする固定サイズのウィンドウ) とスライディング ウィンドウ (データ ストリーム全体をスライドする固定サイズのウィンドウ) があります。ウィンドウ操作を使用すると、分単位、時間単位、日単位のデータ集計など、さまざまな時間スケールでデータを要約および分析できます。 (3)依存関係の処理ストリーム処理アプリケーションは通常、複数のタスクと依存関係で構成されます。データが正しい順序で処理されるようにするには、タスク間の依存関係を管理することが重要です。依存関係処理には、タスクの起動とシャットダウンの順序、データ フローのトポロジ ソート、障害回復などが含まれます。これにより、特に分散ストリーム処理アプリケーションにおいて、タスク間の一貫性と正確性が保証されます。 これらの戦略と主要な概念は連携して、ストリーム処理アプリケーションの信頼性、適時性、正確性を確保します。これらはリアルタイム データ アプリケーションを構築するための基盤であり、アプリケーション シナリオによって異なる調整や最適化が必要になる場合があります。 4. 例: Kafka Streams によるストリーム処理この例では、ストリーム処理に Kafka Streams を使用する方法を示しました。サンプルコードの詳細な説明は次のとおりです。 まず、Kafka Streams アプリケーションを構成するために使用する Properties オブジェクトを作成します。アプリケーション ID と Kafka クラスターのアドレスを設定します。 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 次に、ストリーム処理トポロジの構築に使用される StreamsBuilder オブジェクトを作成します。 StreamsBuilder builder = new StreamsBuilder(); ビルダーを使用して、stream-data-topic と呼ばれる Kafka トピックから入力データ ストリームを作成します。 KStream<String, String> source = builder.stream("stream-data-topic"); 次に、データ ストリームに対して一連の操作を実行します。まず、フィルター操作を使用して、「重要なデータ」を含むメッセージをフィルター処理します。 source .filter((key, value) -> value.contains("important-data")) 次に、mapValues 操作を使用して、フィルタリングされたメッセージの値を大文字に変換します。 .mapValues(value -> value.toUpperCase()) 最後に、to 操作を使用して、処理されたメッセージを output-topic という Kafka トピックに送信します。 .to("output-topic"); 最後に、KafkaStreams オブジェクトを作成し、builder.build() と構成プロパティを渡して、ストリーム処理アプリケーションを起動します。 KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); この例では、Kafka Streams を使用して、メッセージをフィルタリングおよび変換し、その結果を別のトピックに送信するストリーム処理アプリケーションを簡単に構築する方法を示します。これにより、リアルタイムのデータ処理が比較的シンプルになり、スケーラビリティとフォールトトレランスが向上します。 7. バッチ処理とストリーム処理を統合する1. データストリーム統合データ ストリーム統合は、バッチ処理とストリーム処理を組み合わせるプロセスです。データ処理時にデータの特性に基づいて処理モードを切り替えることができるため、アプリケーションのニーズをより適切に満たすことができます。データフローの統合は、データ処理中にさまざまなツールとライブラリをシームレスに切り替えることで実現できます。 2. データ変換データ ストリームの統合では、多くの場合、バッチ処理とストリーム処理の間でデータがシームレスに流れるようにするために、データ変換が必要になります。これには次のものが含まれます。 - データ形式の変換: データをバッチ形式からストリーミング形式に、またはその逆に変換します。これにより、さまざまな処理モードでデータを正しく解釈できるようになります。
- フィールド マッピング: データ フロー統合中に、フィールド名と構造が異なる場合があります。したがって、データがさまざまな処理段階に正しくマッピングされるようにするには、フィールド マッピングが必要です。
3. データ転送バッチ処理からストリーム処理にデータを渡す場合、またはその逆の場合、適切なデータ転送メカニズムが必要です。 Kafka はデータ配信を簡単にサポートできるため、優れたデータ配信ツールです。 Kafka では、バッチ タスクは特定のバッチ トピックにデータを書き込むことができ、ストリーム タスクはこれらのトピックからデータを読み取ることができます。これにより、バッチ処理とストリーム処理の調整が容易になります。 4. ベストプラクティス: バッチ処理とストリーム処理の共同アプリケーション実際のアプリケーションでバッチ処理とストリーム処理を統合する必要がある場合は、より詳細な手順とサンプル コードを次に示します。 ステップ1: ニーズに応じて適切な処理モードを選択します- 要件を定義する: まず、データ処理の要件を明確に定義します。どのタスクにバッチ処理が必要か、どのタスクにストリーミングが必要か、または同時に動作する必要があるかを判断します。
- 適切なツールを選択する: ニーズに基づいて適切な処理ツールとフレームワークを選択します。たとえば、バッチ処理が必要な場合は、Apache Spark を使用できます。ストリーム処理が必要な場合は、Kafka Streams または Apache Flink を選択できます。
ステップ2: データ変換とデータ転送- データ変換: データをバッチ モードからストリーミング モードに切り替える必要がある場合、またはその逆の場合は、適切なデータ形式の変換とフィールド マッピングを必ず実行してください。
- データ転送: データ転送メカニズムを確立します。 Kafka をデータ パイプラインとして使用すると、バッチ処理タスクとストリーム処理タスク間のデータ配信を簡単にサポートできるため、非常に有利です。
ステップ3: 適切な監視とログ記録- 監視とログ記録: 効果的な監視とログ記録のメカニズムを確立します。 Prometheus などの監視ツールや ELK Stack などのログ記録フレームワークを使用して、データ処理プロセスを追跡および監視できます。タスクの実行ステータス、パフォーマンス、エラーを監視できることを確認してください。
ステップ4: テストと評価- テストと評価: バッチ処理とストリーム処理をアプリケーションに組み込む前に、徹底的なテストと評価を実行します。実際の負荷をシミュレートし、データの一貫性と正確性を確保します。
サンプルコード以下は、データ配信メカニズムとして Kafka を使用してバッチ処理とストリーム処理を統合する方法を示した簡単な例です。ファイルからデータを読み取って Kafka トピックに書き込むバッチ処理タスクと、同じ Kafka トピックからデータを読み取ってリアルタイムで処理するストリーム処理タスクがあるとします。 バッチ処理タスク (Apache Spark を使用): import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; public class BatchToStreamIntegration { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("BatchToStreamIntegration"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, new Duration(5000)); Map<String, Integer> topicMap = new HashMap<>(); topicMap.put("input-topic", 1); JavaDStream<String> messages = KafkaUtils.createStream(streamingContext, "zookeeper.quorum", "group", topicMap) .map(consumerRecord -> consumerRecord._2()); messages.foreachRDD((JavaRDD<String> rdd) -> { rdd.foreach(record -> processRecord(record)); }); streamingContext.start(); streamingContext.awaitTermination(); } private static void processRecord(String record) { System.out.println("Batch processing record: " + record); } } ストリーム処理タスク (Kafka Streams を使用): import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class StreamToBatchIntegration { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-to-batch-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())); source.foreach((key, value) -> { processRecord(value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } private static void processRecord(String record) { System.out.println("Stream processing record: " + record); } } これら 2 つの例は、さまざまなツールを使用してバッチ処理とストリーム処理の統合を実現する方法を示しています。 |