1. 何を話すか このシリーズの読者のニーズに応えるために、まず Apache Flink での Kafka の使用法を紹介します。そこでこの記事では、簡単な例を使用して、Apache Flink で Kafka を使用する方法を説明します。 2. Kafka の紹介 Apache Kafka は、分散型のパブリッシュ/サブスクライブ メッセージング システムです。これはもともと LinkedIn によって開発され、2010 年に Apache Foundation に寄贈され、主要なオープンソース プロジェクトになりました。 Kafka は、リアルタイム データ パイプラインとストリーミング アプリケーションの構築に使用されます。水平スケーラビリティ、フォールトトレランス、非常に高速な速度を備えており、現在広く使用されています。 Kafka は分散メッセージング システムであるだけでなく、ストリーム コンピューティングもサポートします。したがって、Apache Flink での Kafka のアプリケーションを紹介する前に、まずは簡単な Kafka の例を使用して、Kafka が何であるかを直感的に理解してみましょう。 1. インストール この記事は Kafka の体系的かつ詳細な紹介ではありませんが、Apache Flink で Kafka を適切に適用できるように、Kafka を直感的に理解できるようにすることを目的としています。したがって、最も簡単な方法で Kafka をインストールします。 (1)バイナリパッケージをダウンロードします。 - カール -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
(2)解凍してインストールする Kafka をインストールするには、次のようにダウンロードした tgz ファイルを解凍するだけです。 - 金城:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz
- 金城:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
- ライセンス通知 bin config libs site-docs
bin ディレクトリには、次に起動する Kafka サーバーなどのすべての Kafka 管理コマンドが含まれています。 (3)Kafkaサーバーを起動する Kafka はパブリッシュ/サブスクライブ システムです。メッセージのサブスクリプションにはサービスの存在が必要です。 Kafka Server インスタンスを起動します。 Kafka には ZooKeeper が必要です。本番環境にデプロイするには、ZooKeeper クラスターをインストールする必要があります。これはこの記事の範囲を超えているため、Kafka が提供するスクリプトを使用して、ノードが 1 つだけの ZooKeeper インスタンスをインストールします。次のように: - jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &
-
- [2019-01-13 09:06:19,985] INFO 設定を次の場所から読み取り中: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- ....
- ....
- [2019-01-13 09:06:20,061] INFO ポート 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) へのバインド
起動後、ZooKeeper はポート 2181 (デフォルト) にバインドします。次に、次のように Kafka サーバーを起動します。 - jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
- [2019-01-13 09:09:16,937] INFO 登録されたkafka kafka:type =kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
- [2019-01-13 09:09:17,267] INFO 開始 (kafka.server.KafkaServer)
- [2019-01-13 09:09:17,267] INFO localhost:2181 (kafka.server.KafkaServer) の zookeeper に接続しています
- [2019-01-13 09:09:17,284] INFO [ZooKeeperClient] localhost:2181 への新しいセッションを初期化しています。 (kafka.zookeeper.ZooKeeperクライアント)
- ...
- ...
- [2019-01-13 09:09:18,253] INFO [KafkaServer id = 0 ] が開始されました (kafka.server.KafkaServer)
すべてがうまくいけば、Kafka のインストールは完了です。 2. トピックを作成する Kafka はメッセージサブスクリプションシステムです。まず、サブスクライブできるトピックを作成します。 flink-tipic という名前のトピックを作成します。新しいターミナルで、次のコマンドを実行します。 - jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic
-
- トピック「flink-tipic」を作成しました。
次の作成成功情報も Kafka サーバー ターミナルに出力されます。 - ...
- [2019-01-13 09:13:31,156] INFO /tmp/kafka-logs にパーティション flink-tipic-0 のログが作成されました。プロパティは {compression.type - > producer、message.format.version - > 2.1-IV2、file.delete.delay.ms - > 60000、max.message.bytes - > 1000012、min.compaction.lag.ms - > 0、message.timestamp.type - > CreateTime、message.downconversion.enable - > true、min.insync.replicas - > 1、segment.jitter.ms - > 0、preallocate - > false、min.cleanable.dirty.ratio - > 0.5、index.interval.bytes - > 4096、unclean.leader.election.enable - > false、retention.bytes - > -1、 delete.retention.ms - > 86400000、cleanup.policy - > [削除]、flush.ms - > 9223372036854775807、segment.ms - > 604800000、segment.bytes - > 1073741824、retention.ms - > 604800000、message.timestamp.difference.max.ms - > 9223372036854775807、segment.index.bytes - > 10485760、flush.messages - > 9223372036854775807}。 (kafka.log.LogManager)...
上記は、メッセージ圧縮方法、メッセージ形式、バックアップ数など、flink-topic の基本的なプロパティ構成を示しています。 ログを表示するだけでなく、次のようにコマンドを使用して、flink-topic が正常に作成されたかどうかを表示することもできます。 - jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper ローカルホスト:2181
-
- フリンクティピック
flink-tipic が出力された場合、トピックが正常に作成されたことを意味します。 では、トピックはどこに保存されるのでしょうか? Kafka はどのようにしてメッセージを公開およびサブスクライブしますか?直感的に理解するために、次の Kafka アーキテクチャ図を見て簡単に理解してみましょう。 簡単に説明すると、Kafka は ZooKeeper を使用してクラスター情報を保存します。これは、上記で起動した Kafka Server インスタンスです。クラスター内に複数の Kafka Server インスタンスが存在する場合があります。 Kafka サーバーはブローカーと呼ばれ、作成したトピックは 1 つ以上のブローカーに含めることができます。 Kafka は、メッセージを送信するには Push モードを使用し、メッセージをプルするには Pull モードを使用します。 3. メッセージを送信する 既存のトピックにメッセージを送信するにはどうすればよいですか?もちろん、API を使用してメッセージを送信するコードを記述することもできます。同時に、次のようにコマンド方式を使用してメッセージを便利に送信することもできます。 - jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
- > Kafka テストメッセージ
- > Kafkaコネクタ
上記では、Kafka テスト メッセージと Kafka コネクタの 2 つのメッセージを flink-topic トピックに送信しました。 4. メッセージを読む 指定されたトピックからメッセージを読み取りたい場合はどうすればよいでしょうか?これは、API とコマンドの両方を使用して実行できます。次のコマンドを使用して、flink-topic からメッセージを読み取ることができます。 - jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
- Kafka テスト メッセージ
- Kafka コネクタ
--from-beginning パラメータは、トピックの先頭からメッセージを読み取ることを指定します。 3. Flink Kafka コネクタ これまで、最も簡単な方法で Kafka 環境をインストールしました。それでは、上記の環境を使用して、Flink Kafka Connector の使用方法を紹介しましょう。 Flinkコネクタに関する基礎知識は、「Apache Flinkトークシリーズ(14) - コネクタ」で紹介されます。ここでは、Kafka Connector に関連するコンテンツを直接紹介します。 Apache Flink は、Kafka コネクタの複数のバージョンを提供します。この記事では flink-1.7.0 を例に説明します。 1. mvn 依存関係 Kakfa Connector を使用するには、次のように、pom に Kafka Connector への依存関係を追加する必要があります。 - <依存関係>
- <グループID> org.apache.flink</グループID>
- <artifactId>フリンク-コネクタ- kafka_2.11 </artifactId>
- <バージョン> 1.7.0</バージョン>
- </依存関係>
Flink Kafka Consumer は、Kafka 内のバイナリ データを Java/Scala オブジェクトに変換する方法を知る必要があります。 DeserializationSchema を使用すると、ユーザーはそのようなスキーマを指定できます。各 Kafka メッセージに対して T deserialize(byte[] message) メソッドを呼び出し、Kafka から値を渡します。 2. 例 この例では、Kafka からデータを読み取り、簡単な処理を行った後に Kafka に書き込みます。次のように、書き込み用の別のトピックを作成する必要があります。 - bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output
したがって、この例では、ソースとして flink-topic を使用し、シンクとして slink-topic-output を使用します。 (1)シンプルなETL Kafka に保存されているのは単純な文字列であると想定しているため、文字列のシリアル化とデシリアル化の実装、つまり DeserializationSchema と SerializationSchema のシリアル化とデシリアル化を実装するクラスを定義する必要があります。この例は文字列なので、KafkaMsgSchema 実装クラスをカスタマイズしてから、Flink メイン プログラムを記述します。 - org.apache.flink.api.common.serialization.DeserializationSchema をインポートします。
- org.apache.flink.api.common.serialization.SerializationSchema をインポートします。
- org.apache.flink.api.common.typeinfo.BasicTypeInfo をインポートします。
- org.apache.flink.api.common.typeinfo.TypeInformation をインポートします。
- org.apache.flink.util.Preconditions をインポートします。
-
- java.io.IOException をインポートします。
- java.io.ObjectInputStream をインポートします。
- java.io.ObjectOutputStream をインポートします。
- java.nio.charset.Charset をインポートします。
-
- パブリッククラス KafkaMsgSchema は DeserializationSchema < String > 、 SerializationSchema < String >を実装します{
- プライベート静的最終long serialVersionUID = 1L ;
- プライベート一時文字セット charset;
-
- パブリックKafkaMsgSchema() {
- //デフォルトのUTF-8エンコード
- Charset.forName("UTF-8") を使用します。
- }
-
- パブリック KafkaMsgSchema(文字セット charset) {
- this.charset =前提条件.checkNotNull(charset);
- }
-
- パブリック文字セット getCharset() {
- this.charset を返します。
- }
-
- パブリック文字列デシリアライズ(byte[] message) {
- // Kafka メッセージを Java オブジェクトにデシリアライズする
- 新しい文字列(メッセージ、文字セット)を返します。
- }
-
- パブリックブール値isEndOfStream(文字列 nextElement) {
- // ストリームは終わらない
- false を返します。
- }
-
- パブリックbyte[] serialize(文字列要素) {
- //JavaオブジェクトをKafkaメッセージにシリアル化します
- element.getBytes(this.charset) を返します。
- }
-
- パブリックTypeInformation <文字列> getProducedType() {
- // 生成されるデータTypeinfoを定義する
- BasicTypeInfo.STRING_TYPE_INFO を返します。
- }
-
- プライベートvoid writeObject(ObjectOutputStream out)はIOExceptionをスローします{
- out.defaultWriteObject();
- out.writeUTF(this.charset.name());
- }
-
- プライベート void readObject(ObjectInputStream in) は IOException、ClassNotFoundException をスローします {
- in.defaultReadObject();
- 文字列charsetName = .readUTF ();
- this.charset = Charset .forName(charsetName);
- }
- }
メインプログラム - 完全なコード- org.apache.flink.api.common.functions.MapFunction をインポートします。
- org.apache.flink.api.java.utils.ParameterTool をインポートします。
- org.apache.flink.streaming.api.datastream.DataStream をインポートします。
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment をインポートします。
- org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer をインポートします。
- org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer をインポートします。
- org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper をインポートします。
-
- java.util.Properties をインポートします。
-
- パブリッククラスKafkaExample {
- パブリック静的void main(String[] args)は例外をスローします{
- // ユーザーパラメータを取得する
- 最終的な ParameterToolパラメータツール= ParameterTool .fromArgs(args);
- // ストリーム環境
- StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
-
- // ソースのトピック
- 文字列sourceTopic = "flink-topic" ;
- //シンクのトピック
- 文字列sinkTopic = "flink-topic-output" ;
- // ブローカーアドレス
- 文字列ブローカー= "localhost:9092" ;
-
- // 属性パラメータ - 実際の生産量はコマンドラインで渡すことができます
- プロパティp = parameterTool .getProperties();
- p.putAll(parameterTool.getProperties());
- p.put("bootstrap.servers", ブローカー);
-
- env.getConfig().setGlobalJobParameters(parameterTool);
-
- // コンシューマーを作成する
- FlinkKafkaConsumerコンシューマー=新しいFlinkKafkaConsumer <文字列> (
- ソーストピック、
- 新しい KafkaMsgSchema()、
- ();
- // 読み取る最も古いデータを設定する
- // Consumer.setStartFromEarliest();
-
- // Kafka メッセージを読み取る
- データストリーム<文字列> 入力= env .addSource(consumer);
-
-
- // データ処理
- データストリーム<文字列> 結果=入力.map(新しい MapFunction <文字列, 文字列> () {
- パブリック String map(String s) は例外をスローします {
- 文字列msg = "Flink study " .concat(s);
- System.out.println(メッセージ);
- メッセージを返します。
- }
- });
-
- // プロデューサーを作成する
- FlinkKafkaProducerプロデューサー=新しいFlinkKafkaProducer < String > (
- シンクトピック、
- 新しい KeyedSerializationSchemaWrapper <文字列> (新しい KafkaMsgSchema())、
- p、
- FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-
- //Kafka の指定されたトピックにデータを書き込む
- 結果にシンクを追加します(プロデューサー)。
-
- // ジョブを実行する
- env.execute("Kafka の例");
- }
- }
メインプログラムを次のように実行します。 私のテスト操作のプロセスは次のとおりです。 - flink-topic と flink-topic-output の消費とプルを開始します。
- コマンドによるテストのみを目的として、flink-topic にテスト メッセージを追加します。
- テスト専用のコマンド印刷によって追加されたテスト メッセージを確認します。
- 最も単純な FlinkJob source->map->sink は、テスト メッセージに対してマップ処理を実行します: "Flink study ".concat(s);
- コマンドを使用してシンク データを印刷します。
(2)組み込みスキーマ Apache Flink は、一般的なメッセージ形式に対して次の 3 つの組み込みスキーマを提供します。 - TypeInformationSerializationSchema (および TypeInformationKeyValueSerializationSchema) これは、Flink の TypeInformation 作成スキーマに基づいています。これは、データが Flink によって書き込まれ、読み取られる場合に便利です。
- JsonDeserializationSchema (および JSONKeyValueDeserializationSchema) は、シリアル化された JSON を ObjectNode オブジェクトに変換します。このオブジェクトから、objectNode.get("field") As (Int/String/...)() を使用してフィールドにアクセスできます。KeyValue objectNode には、すべてのフィールドを含む "key" フィールドと "value" フィールド、およびこのメッセージのオフセット/パーティション/トピックを公開するオプションの "metadata" フィールドが含まれています。
- AvroDeserializationSchema 静的に提供されたスキーマを使用して、Avro 形式でシリアル化されたデータを読み取ります。 Avro で生成されたクラス ( AvroDeserializationSchema.forSpecific(...) ) からスキーマを推測することも、GenericRecords で手動で提供されたスキーマを使用することもできます ( AvroDeserializationSchema.forGeneric(...) を使用)
組み込みスキーマを使用するには、次の依存関係を追加する必要があります。 - <依存関係>
- <グループID> org.apache.flink</グループID>
- <artifactId>フリンク- Avro </artifactId>
- <バージョン> 1.7.0</バージョン>
- </依存関係>
(3)読み取り位置設定 Kafka データを使用する場合、消費場所を指定する必要がある場合があります。 Apache Flink の FlinkKafkaConsumer は、次のように多くの便利な場所設定を提供します。 - consumer.setStartFromEarliest() - 最も古いレコードから開始します。
- consumer.setStartFromLatest() - 最新のレコードから開始します。
- Consumer.setStartFromTimestamp(...); // 指定されたエポックタイムスタンプ(ミリ秒)から開始します。
- Consumer.setStartFromGroupOffsets(); // デフォルトの動作。前回消費したオフセットから消費を続行します。
上記の場所の指定は、次のコードのように、各パーティションに対して正確に行うことができます。 - マップ< KafkaTopicPartition , Long > specificStartOffsets =新しいHashMap < > ();
- specificStartOffsets.put(新しいKafkaTopicPartition("myTopic", 0), 23L); // 最初のパーティションは23Lから始まります
- specificStartOffsets.put(新しいKafkaTopicPartition("myTopic", 1), 31L); // 2番目のパーティションは31Lから始まります
- specificStartOffsets.put(新しいKafkaTopicPartition("myTopic", 2), 43L); // 3番目のパーティションは43Lから始まります
-
- Consumer.setStartFromSpecificOffsets(特定のStartOffsets);
パーティションが指定されていない場合は、デフォルトの setStartFromGroupOffsets メソッドが使用されます。 (4)トピックの発見 Kafka はトピックの自動検出をサポートしています。つまり、通常の方法で FlinkKafkaConsumer を作成します。次に例を示します。 - // コンシューマーを作成する
- FlinkKafkaConsumerコンシューマー=新しいFlinkKafkaConsumer <文字列> ( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
- 新しい KafkaMsgSchema()、
- ();
上記の例では、ジョブの実行が開始されると、コンシューマーは、指定された正規表現 (sourceTopic の値で始まり、1 桁の数字で終わる) に一致する名前を持つすべてのトピックにサブスクライブされます。 3. 透かしを定義する(ウィンドウ) Kafka コネクタの用途は、上記の単純なデータ抽出に限定されません。多くの場合、Kafka データに対してイベント時間のウィンドウ操作を実行することが想定されるため、Flink Kafka Source で Watermark を定義する必要があります。 イベント時間を定義するには、まず、Kafka データに時間属性を追加します。データが String#Long 形式 (test#1000 のみ) であると仮定します。次に、時間列として Long を使用します。 - KafkaWithTsMsgSchema - 完全なコード
上記のKafkaデータ形式を解析するには、KafkaWithTsMsgSchemaなどのカスタムスキーマを開発して、String#LongをJava Tuple2に解析する必要があります。 - org.apache.flink.api.common.serialization.DeserializationSchema をインポートします。
- org.apache.flink.api.common.serialization.SerializationSchema をインポートします。
- org.apache.flink.api.common.typeinfo.BasicTypeInfo をインポートします。
- org.apache.flink.api.common.typeinfo.TypeInformation をインポートします。
- org.apache.flink.api.java.tuple.Tuple2 をインポートします。
- org.apache.flink.api.java.typeutils.TupleTypeInfo をインポートします。
- org.apache.flink.util.Preconditions をインポートします。
-
- java.io.IOException をインポートします。
- java.io.ObjectInputStream をインポートします。
- java.io.ObjectOutputStream をインポートします。
- java.nio.charset.Charset をインポートします。
-
- パブリック クラス KafkaWithTsMsgSchema は、 DeserializationSchema < Tuple2 < String , Long >> 、SerializationSchema < Tuple2 < String , Long >> を実装します。
- プライベート静的最終long serialVersionUID = 1L ;
- プライベート一時文字セット charset;
-
- パブリック KafkaWithTsMsgSchema() {
- Charset.forName("UTF-8") を使用します。
- }
-
- パブリック KafkaWithTsMsgSchema(文字セット charset) {
- this.charset =前提条件.checkNotNull(charset);
- }
-
- パブリック文字セット getCharset() {
- this.charset を返します。
- }
-
- パブリック Tuple2 < String , Long >デシリアライズ(byte[] message) {
- 文字列msg =新しい文字列(メッセージ、文字セット);
- String[] dataAndTs = msg .split("#");
- if( dataAndTs.length == 2){
- return new Tuple2 < String , Long > (dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
- }それ以外{
- // 実際の運用では、実行時例外をスローする必要があります
- System.out.println("無効なメッセージ形式のため失敗しました。.["+msg+"]");
- 新しい Tuple2 < String , Long > (msg, 0L) を返します。
- }
- }
-
- @オーバーライド
- パブリックブール値 isEndOfStream(Tuple2 < String , Long > stringLongTuple2) {
- false を返します。
- }
-
- パブリックbyte[]シリアル化(Tuple2 < String , Long >要素) {
- "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset); を返します。
- }
-
- プライベートvoid writeObject(ObjectOutputStream out)はIOExceptionをスローします{
- out.defaultWriteObject();
- out.writeUTF(this.charset.name());
- }
-
- プライベート void readObject(ObjectInputStream in) は IOException、ClassNotFoundException をスローします {
- in.defaultReadObject();
- 文字列charsetName = .readUTF ();
- this.charset = Charset .forName(charsetName);
- }
-
- @オーバーライド
- パブリックTypeInformation < Tuple2 < String , Long >> getProducedType () {
- 新しい TupleTypeInfo < Tuple2 < String , Long >> (BasicTypeInfo.STRING_TYPE_INFO、BasicTypeInfo.LONG_TYPE_INFO)を返します。
- }}
タイムスタンプを抽出して透かしを作成するには、カスタムの時間抽出および透かしジェネレーターを実装する必要があります。 Apache Flink には 2 つの方法があります。 - AssignerWithPunctuatedWatermarks - 各レコードは透かしを生成します。
- AssignerWithPeriodicWatermarks - 定期的にウォーターマークを生成します。
AssignerWithPunctuatedWatermarks を例にして、カスタム時間抽出およびウォーターマーク ジェネレーターを作成します。コードは次のとおりです。 - org.apache.flink.api.java.tuple.Tuple2 をインポートします。
- org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks をインポートします。
- org.apache.flink.streaming.api.watermark.Watermark をインポートします。
-
- javax.annotation.Nullable をインポートします。
-
- パブリッククラス KafkaAssignerWithPunctuatedWatermarks
- AssignerWithPunctuatedWatermarks < Tuple2 < String , Long >>を実装します{
- @Null可能
- @オーバーライド
- パブリックウォーターマーク checkAndGetNextWatermark(Tuple2 < String , Long > o, long l) {
- // 抽出したタイムスタンプを使用して透かしを作成する
- 新しいWatermark(l)を返します。
- }
-
- @オーバーライド
- パブリック long extractTimestamp(Tuple2 < String , Long > o, long l) {
- // タイムスタンプを抽出
- o.f1 を返します。
- }}
メインプログラム - 完全プログラム 1 秒のサイズのタンブル ウィンドウを計算し、ウィンドウ内の最高値を計算します。完全な手順は次のとおりです。 - org.apache.flink.api.common.typeinfo.BasicTypeInfo をインポートします。
- org.apache.flink.api.common.typeinfo.TypeInformation をインポートします。
- org.apache.flink.api.java.tuple.Tuple2 をインポートします。
- org.apache.flink.api.java.typeutils.TupleTypeInfo をインポートします。
- org.apache.flink.api.java.utils.ParameterTool をインポートします。
- org.apache.flink.streaming.api.TimeCharacteristic をインポートします。
- org.apache.flink.streaming.api.datastream.DataStream をインポートします。
- org.apache.flink.streaming.api.environment.StreamExecutionEnvironment をインポートします。
- org.apache.flink.streaming.api.windowing.assigners.TumbleEventTimeWindows をインポートします。
- org.apache.flink.streaming.api.windowing.time.Time をインポートします。
- org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer をインポートします。
- org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer をインポートします。
- org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper をインポートします。
-
- java.util.Properties をインポートします。
-
- パブリッククラス KafkaWithEventTimeExample {
- パブリック静的void main(String[] args)は例外をスローします{
- // ユーザーパラメータを取得する
- 最終的な ParameterToolパラメータツール= ParameterTool .fromArgs(args);
- // ストリーム環境
- StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
- // イベント時間を設定する
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // ソースのトピック
- 文字列sourceTopic = "flink-topic" ;
- //シンクのトピック
- 文字列sinkTopic = "flink-topic-output" ;
- // ブローカーアドレス
- 文字列ブローカー= "localhost:9092" ;
-
- // 属性パラメータ - 実際の生産量はコマンドラインで渡すことができます
- プロパティp = parameterTool .getProperties();
- p.putAll(parameterTool.getProperties());
- p.put("bootstrap.servers", ブローカー);
-
- env.getConfig().setGlobalJobParameters(parameterTool);
- // コンシューマーを作成する
- FlinkKafkaConsumerコンシューマー=新しいFlinkKafkaConsumer < Tuple2 < String 、 Long >> (
- ソーストピック、
- 新しい KafkaWithTsMsgSchema()、
- ();
-
- // Kafka メッセージを読み取る
- 型情報< Tuple2 <文字列、Long >> typeInformation =新しいTupleTypeInfo < Tuple2 < String 、 Long >> (
- BasicTypeInfo.STRING_TYPE_INFO、BasicTypeInfo.LONG_TYPE_INFO);
-
- データストリーム< Tuple2 <文字列、Long >> 入力=環境
- .addSource(消費者).returns(typeInformation)
- // タイムスタンプを抽出し、透かしを生成する
- .assignTimestampsAndWatermarks(新しい KafkaAssignerWithPunctuatedWatermarks());
-
- // データ処理
- データストリーム< Tuple2 <文字列、Long >> 結果=入力
- .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
- .max(0);
-
- // プロデューサーを作成する
- FlinkKafkaProducerプロデューサー=新しいFlinkKafkaProducer < Tuple2 < String , Long >> (
- シンクトピック、
- 新しい KeyedSerializationSchemaWrapper < Tuple2 < String , Long >> (新しい KafkaWithTsMsgSchema())、
- p、
- FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-
- //Kafka の指定されたトピックにデータを書き込む
- 結果にシンクを追加します(プロデューサー)。
-
- // ジョブを実行する
- env.execute("イベント時間の例を使用した Kafka");
- }}
テストは次のように実行されます。 簡単に説明すると、次の数字を入力します。 5000000 から 7000000 の間のデータを調べています。ここで、B#5000000、C#5000100、E#5000120 は同じウィンドウの内容です。 MAX 値を計算して文字列を比較します。 *** メッセージは出力 E#5000120 です。 4. Kafkaはタイムスタンプを搭載している Kafka-0.10 以降では、メッセージにタイムスタンプを付けることができるため、msg にタイムスタンプとしてデータ列を追加する必要はありません。書き込みと読み取りの両方に Flink を使用すると、さらに簡単になります。一般的には、上記の例で十分です。 IV.まとめ この記事では、Flink で Kafka を使用する方法に焦点を当てます。まず、Kafka の簡単なインストールと、メッセージの送受信のコマンドのデモンストレーションから始まります。次に、単純なデータ抽出とイベント時間ウィンドウの例を使用して、Apache Flink で Kafka を使用する方法を直感的に理解できるようにします。ここで紹介した内容がお役に立てれば幸いです! いいねとコメントについて この一連の記事には、必然的に多くの欠陥や欠点があります。読者の皆様には、実りある章には「いいね!」や励ましを、不十分な章にはフィードバックやご提案をいただければ幸いです。よろしくお願いします! 著者:孫金成(通称:金珠)は現在アリババで働いています。 2015年よりApache FlinkをベースとしたAlibabaのコンピューティングプラットフォームBlinkの設計・開発に携わる。 【この記事は51CTOコラムニスト「Jin Zhu」によるオリジナル記事です。転載については原著者にお問い合わせください。 この著者の他の記事を読むにはここをクリックしてください |