Apache Flink トークシリーズ (15) - Kafka 用データストリーム コネクタ

Apache Flink トークシリーズ (15) - Kafka 用データストリーム コネクタ

1. 何を話すか

このシリーズの読者のニーズに応えるために、まず Apache Flink での Kafka の使用法を紹介します。そこでこの記事では、簡単な例を使用して、Apache Flink で Kafka を使用する方法を説明します。

2. Kafka の紹介

Apache Kafka は、分散型のパブリッシュ/サブスクライブ メッセージング システムです。これはもともと LinkedIn によって開発され、2010 年に Apache Foundation に寄贈され、主要なオープンソース プロジェクトになりました。 Kafka は、リアルタイム データ パイプラインとストリーミング アプリケーションの構築に使用されます。水平スケーラビリティ、フォールトトレランス、非常に高速な速度を備えており、現在広く使用されています。

Kafka は分散メッセージング システムであるだけでなく、ストリーム コンピューティングもサポートします。したがって、Apache Flink での Kafka のアプリケーションを紹介する前に、まずは簡単な Kafka の例を使用して、Kafka が何であるかを直感的に理解してみましょう。

1. インストール

この記事は Kafka の体系的かつ詳細な紹介ではありませんが、Apache Flink で Kafka を適切に適用できるように、Kafka を直感的に理解できるようにすることを目的としています。したがって、最も簡単な方法で Kafka をインストールします。

(1)バイナリパッケージをダウンロードします。

  1. カール -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

(2)解凍してインストールする

Kafka をインストールするには、次のようにダウンロードした tgz ファイルを解凍するだけです。

  1. 金城:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz
  2. 金城:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
  3. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
  4. ライセンス通知 bin config libs site-docs

bin ディレクトリには、次に起動する Kafka サーバーなどのすべての Kafka 管理コマンドが含まれています。

(3)Kafkaサーバーを起動する

Kafka はパブリッシュ/サブスクライブ システムです。メッセージのサブスクリプションにはサービスの存在が必要です。 Kafka Server インスタンスを起動します。 Kafka には ZooKeeper が必要です。本番環境にデプロイするには、ZooKeeper クラスターをインストールする必要があります。これはこの記事の範囲を超えているため、Kafka が提供するスクリプトを使用して、ノードが 1 つだけの ZooKeeper インスタンスをインストールします。次のように:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &
  2.  
  3. [2019-01-13 09:06:19,985] INFO 設定を次の場所から読み取り中: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  4. ....
  5. ....
  6. [2019-01-13 09:06:20,061] INFO ポート 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) へのバインド

起動後、ZooKeeper はポート 2181 (デフォルト) にバインドします。次に、次のように Kafka サーバーを起動します。

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
  2. [2019-01-13 09:09:16,937] INFO 登録されたkafka kafka:type =kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
  3. [2019-01-13 09:09:17,267] INFO 開始 (kafka.server.KafkaServer)
  4. [2019-01-13 09:09:17,267] INFO localhost:2181 (kafka.server.KafkaServer) の zookeeper に接続しています
  5. [2019-01-13 09:09:17,284] INFO [ZooKeeperClient] localhost:2181 への新しいセッションを初期化しています。 (kafka.zookeeper.ZooKeeperクライアント)
  6. ...
  7. ...
  8. [2019-01-13 09:09:18,253] INFO [KafkaServer id = 0 ] が開始されました (kafka.server.KafkaServer)

すべてがうまくいけば、Kafka のインストールは完了です。

2. トピックを作成する

Kafka はメッセージサブスクリプションシステムです。まず、サブスクライブできるトピックを作成します。 flink-tipic という名前のトピックを作成します。新しいターミナルで、次のコマンドを実行します。

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic
  2.  
  3. トピック「flink-tipic」を作成しました。

次の作成成功情報も Kafka サーバー ターミナルに出力されます。

  1. ...
  2. [2019-01-13 09:13:31,156] INFO /tmp/kafka-logs にパーティション flink-tipic-0 のログが作成されました。プロパティは {compression.type - > producer、message.format.version - > 2.1-IV2、file.delete.delay.ms - > 60000、max.message.bytes - > 1000012、min.compaction.lag.ms - > 0、message.timestamp.type - > CreateTime、message.downconversion.enable - > true、min.insync.replicas - > 1、segment.jitter.ms - > 0、preallocate - > false、min.cleanable.dirty.ratio - > 0.5、index.interval.bytes - > 4096、unclean.leader.election.enable - > false、retention.bytes - > -1、 delete.retention.ms - > 86400000、cleanup.policy - > [削除]、flush.ms - > 9223372036854775807、segment.ms - > 604800000、segment.bytes - > 1073741824、retention.ms - > 604800000、message.timestamp.difference.max.ms - > 9223372036854775807、segment.index.bytes - > 10485760、flush.messages - > 9223372036854775807}。 (kafka.log.LogManager)...

上記は、メッセージ圧縮方法、メッセージ形式、バックアップ数など、flink-topic の基本的なプロパティ構成を示しています。

ログを表示するだけでなく、次のようにコマンドを使用して、flink-topic が正常に作成されたかどうかを表示することもできます。

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper ローカルホスト:2181
  2.  
  3. フリンクティピック

flink-tipic が出力された場合、トピックが正常に作成されたことを意味します。

では、トピックはどこに保存されるのでしょうか? Kafka はどのようにしてメッセージを公開およびサブスクライブしますか?直感的に理解するために、次の Kafka アーキテクチャ図を見て簡単に理解してみましょう。

簡単に説明すると、Kafka は ZooKeeper を使用してクラスター情報を保存します。これは、上記で起動した Kafka Server インスタンスです。クラスター内に複数の Kafka Server インスタンスが存在する場合があります。 Kafka サーバーはブローカーと呼ばれ、作成したトピックは 1 つ以上のブローカーに含めることができます。 Kafka は、メッセージを送信するには Push モードを使用し、メッセージをプルするには Pull モードを使用します。

3. メッセージを送信する

既存のトピックにメッセージを送信するにはどうすればよいですか?もちろん、API を使用してメッセージを送信するコードを記述することもできます。同時に、次のようにコマンド方式を使用してメッセージを便利に送信することもできます。

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
  2. > Kafka テストメッセージ
  3. > Kafkaコネクタ

上記では、Kafka テスト メッセージと Kafka コネクタの 2 つのメッセージを flink-topic トピックに送信しました。

4. メッセージを読む

指定されたトピックからメッセージを読み取りたい場合はどうすればよいでしょうか?これは、API とコマンドの両方を使用して実行できます。次のコマンドを使用して、flink-topic からメッセージを読み取ることができます。

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
  2. Kafka テスト メッセージ
  3. Kafka コネクタ

--from-beginning パラメータは、トピックの先頭からメッセージを読み取ることを指定します。

3. Flink Kafka コネクタ

これまで、最も簡単な方法で Kafka 環境をインストールしました。それでは、上記の環境を使用して、Flink Kafka Connector の使用方法を紹介しましょう。 Flinkコネクタに関する基礎知識は、「Apache Flinkトークシリーズ(14) - コネクタ」で紹介されます。ここでは、Kafka Connector に関連するコンテンツを直接紹介します。

Apache Flink は、Kafka コネクタの複数のバージョンを提供します。この記事では flink-1.7.0 を例に説明します。

1. mvn 依存関係

Kakfa Connector を使用するには、次のように、pom に Kafka Connector への依存関係を追加する必要があります。

  1. <依存関係>  
  2. <グループID> org.apache.flink</グループID>  
  3. <artifactId>フリンク-コネクタ- kafka_2.11 </artifactId>  
  4. <バージョン> 1.7.0</バージョン>  
  5. </依存関係>  

Flink Kafka Consumer は、Kafka 内のバイナリ データを Java/Scala オブジェクトに変換する方法を知る必要があります。 DeserializationSchema を使用すると、ユーザーはそのようなスキーマを指定できます。各 Kafka メッセージに対して T deserialize(byte[] message) メソッドを呼び出し、Kafka から値を渡します。

2. 例

この例では、Kafka からデータを読み取り、簡単な処理を行った後に Kafka に書き込みます。次のように、書き込み用の別のトピックを作成する必要があります。

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output

したがって、この例では、ソースとして flink-topic を使用し、シンクとして slink-topic-output を使用します。

(1)シンプルなETL

Kafka に保存されているのは単純な文字列であると想定しているため、文字列のシリアル化とデシリアル化の実装、つまり DeserializationSchema と SerializationSchema のシリアル化とデシリアル化を実装するクラスを定義する必要があります。この例は文字列なので、KafkaMsgSchema 実装クラスをカスタマイズしてから、Flink メイン プログラムを記述します。

  • KafkaMsgSchema - 完全なコード
    1. org.apache.flink.api.common.serialization.DeserializationSchema をインポートします。
    2. org.apache.flink.api.common.serialization.SerializationSchema をインポートします。
    3. org.apache.flink.api.common.typeinfo.BasicTypeInfo をインポートします。
    4. org.apache.flink.api.common.typeinfo.TypeInformation をインポートします。
    5. org.apache.flink.util.Preconditions をインポートします。
    6.  
    7. java.io.IOException をインポートします。
    8. java.io.ObjectInputStream をインポートします。
    9. java.io.ObjectOutputStream をインポートします。
    10. java.nio.charset.Charset をインポートします。
    11.  
    12. パブリッククラス KafkaMsgSchema は DeserializationSchema < String > 、 SerializationSchema < String >を実装します{
    13. プライベート静的最終long serialVersionUID = 1L ;
    14. プライベート一時文字セット charset;
    15.  
    16. パブリックKafkaMsgSchema() {
    17. //デフォルトのUTF-8エンコード
    18. Charset.forName("UTF-8") を使用します。
    19. }
    20.  
    21. パブリック KafkaMsgSchema(文字セット charset) {
    22. this.charset =前提条件.checkNotNull(charset);
    23. }
    24.  
    25. パブリック文字セット getCharset() {
    26. this.charset を返します。
    27. }
    28.  
    29. パブリック文字列デシリアライズ(byte[] message) {
    30. // Kafka メッセージを Java オブジェクトにデシリアライズする
    31. 新しい文字列(メッセージ、文字セット)を返します。
    32. }
    33.  
    34. パブリックブール値isEndOfStream(文字列 nextElement) {
    35. // ストリームは終わらない
    36. false を返します。
    37. }
    38.  
    39. パブリックbyte[] serialize(文字列要素) {
    40. //JavaオブジェクトをKafkaメッセージにシリアル化します
    41. element.getBytes(this.charset) を返します。
    42. }
    43.  
    44. パブリックTypeInformation <文字列> getProducedType() {
    45. // 生成されるデータTypeinfoを定義する
    46. BasicTypeInfo.STRING_TYPE_INFO を返します。
    47. }
    48.  
    49. プライベートvoid writeObject(ObjectOutputStream out)はIOExceptionをスローします{
    50. out.defaultWriteObject();
    51. out.writeUTF(this.charset.name());
    52. }
    53.  
    54. プライベート void readObject(ObjectInputStream in) は IOException、ClassNotFoundException をスローします {
    55. in.defaultReadObject();
    56. 文字列charsetName = .readUTF ();
    57. this.charset = Charset .forName(charsetName);
    58. }
    59. }
  • メインプログラム - 完全なコード
    1. org.apache.flink.api.common.functions.MapFunction をインポートします。
    2. org.apache.flink.api.java.utils.ParameterTool をインポートします。
    3. org.apache.flink.streaming.api.datastream.DataStream をインポートします。
    4. org.apache.flink.streaming.api.environment.StreamExecutionEnvironment をインポートします。
    5. org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer をインポートします。
    6. org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer をインポートします。
    7. org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper をインポートします。
    8.  
    9. java.util.Properties をインポートします。
    10.  
    11. パブリッククラスKafkaExample {
    12. パブリック静的void main(String[] args)は例外をスローします{
    13. // ユーザーパラメータを取得する
    14. 最終的な ParameterToolパラメータツール= ParameterTool .fromArgs(args);
    15. // ストリーム環境
    16. StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
    17.  
    18. // ソースのトピック
    19. 文字列sourceTopic = "flink-topic" ;
    20. //シンクのトピック
    21. 文字列sinkTopic = "flink-topic-output" ;
    22. // ブローカーアドレス
    23. 文字列ブローカー= "localhost:9092" ;
    24.  
    25. // 属性パラメータ - 実際の生産量はコマンドラインで渡すことができます
    26. プロパティp = parameterTool .getProperties();
    27. p.putAll(parameterTool.getProperties());
    28. p.put("bootstrap.servers", ブローカー);
    29.  
    30. env.getConfig().setGlobalJobParameters(parameterTool);
    31.  
    32. // コンシューマーを作成する
    33. FlinkKafkaConsumerコンシューマー=新しいFlinkKafkaConsumer <文字列> (
    34. ソーストピック、
    35. 新しい KafkaMsgSchema()、
    36. ();
    37. // 読み取る最も古いデータを設定する
    38. // Consumer.setStartFromEarliest();
    39.  
    40. // Kafka メッセージを読み取る
    41. データストリーム<文字列>  入力= env .addSource(consumer);
    42.  
    43.  
    44. // データ処理
    45. データストリーム<文字列>  結果=入力.map(新しい MapFunction <文字列, 文字列> () {
    46. パブリック String map(String s) は例外をスローします {
    47. 文字列msg = "Flink study " .concat(s);
    48. System.out.println(メッセージ);
    49. メッセージを返します。
    50. }
    51. });
    52.  
    53. // プロデューサーを作成する
    54. FlinkKafkaProducerプロデューサー=新しいFlinkKafkaProducer < String > (
    55. シンクトピック、
    56. 新しい KeyedSerializationSchemaWrapper <文字列> (新しい KafkaMsgSchema())、
    57. p、
    58. FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
    59.  
    60. //Kafka の指定されたトピックにデータを書き込む
    61. 結果にシンクを追加します(プロデューサー)。
    62.  
    63. // ジョブを実行する
    64. env.execute("Kafka の例");
    65. }
    66. }

メインプログラムを次のように実行します。

私のテスト操作のプロセスは次のとおりです。

  • flink-topic と flink-topic-output の消費とプルを開始します。
  • コマンドによるテストのみを目的として、flink-topic にテスト メッセージを追加します。
  • テスト専用のコマンド印刷によって追加されたテスト メッセージを確認します。
  • 最も単純な FlinkJob source->map->sink は、テスト メッセージに対してマップ処理を実行します: "Flink study ".concat(s);
  • コマンドを使用してシンク データを印刷します。

(2)組み込みスキーマ

Apache Flink は、一般的なメッセージ形式に対して次の 3 つの組み込みスキーマを提供します。

  • TypeInformationSerializationSchema (および TypeInformationKeyValueSerializationSchema) これは、Flink の TypeInformation 作成スキーマに基づいています。これは、データが Flink によって書き込まれ、読み取られる場合に便利です。
  • JsonDeserializationSchema (および JSONKeyValueDeserializationSchema) は、シリアル化された JSON を ObjectNode オブジェクトに変換します。このオブジェクトから、objectNode.get("field") As (Int/String/...)() を使用してフィールドにアクセスできます。KeyValue objectNode には、すべてのフィールドを含む "key" フィールドと "value" フィールド、およびこのメッセージのオフセット/パーティション/トピックを公開するオプションの "metadata" フィールドが含まれています。
  • AvroDeserializationSchema 静的に提供されたスキーマを使用して、Avro 形式でシリアル化されたデータを読み取ります。 Avro で生成されたクラス ( AvroDeserializationSchema.forSpecific(...) ) からスキーマを推測することも、GenericRecords で手動で提供されたスキーマを使用することもできます ( AvroDeserializationSchema.forGeneric(...) を使用)

組み込みスキーマを使用するには、次の依存関係を追加する必要があります。

  1. <依存関係>  
  2. <グループID> org.apache.flink</グループID>  
  3. <artifactId>フリンク- Avro </artifactId>  
  4. <バージョン> 1.7.0</バージョン>  
  5. </依存関係>  

(3)読み取り位置設定

Kafka データを使用する場合、消費場所を指定する必要がある場合があります。 Apache Flink の FlinkKafkaConsumer は、次のように多くの便利な場所設定を提供します。

  • consumer.setStartFromEarliest() - 最も古いレコードから開始します。
  • consumer.setStartFromLatest() - 最新のレコードから開始します。
  • Consumer.setStartFromTimestamp(...); // 指定されたエポックタイムスタンプ(ミリ秒)から開始します。
  • Consumer.setStartFromGroupOffsets(); // デフォルトの動作。前回消費したオフセットから消費を続行します。

上記の場所の指定は、次のコードのように、各パーティションに対して正確に行うことができます。

  1. マップ< KafkaTopicPartition , Long >   specificStartOffsets =新しいHashMap < > ();
  2. specificStartOffsets.put(新しいKafkaTopicPartition("myTopic", 0), 23L); // 最初のパーティションは23Lから始まります
  3. specificStartOffsets.put(新しいKafkaTopicPartition("myTopic", 1), 31L); // 2番目のパーティションは31Lから始まります
  4. specificStartOffsets.put(新しいKafkaTopicPartition("myTopic", 2), 43L); // 3番目のパーティションは43Lから始まります
  5.  
  6. Consumer.setStartFromSpecificOffsets(特定のStartOffsets);

パーティションが指定されていない場合は、デフォルトの setStartFromGroupOffsets メソッドが使用されます。

(4)トピックの発見

Kafka はトピックの自動検出をサポートしています。つまり、通常の方法で FlinkKafkaConsumer を作成します。次に例を示します。

  1. // コンシューマーを作成する
  2. FlinkKafkaConsumerコンシューマー=新しいFlinkKafkaConsumer <文字列> ( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
  3. 新しい KafkaMsgSchema()、
  4. ();

上記の例では、ジョブの実行が開始されると、コンシューマーは、指定された正規表現 (sourceTopic の値で始まり、1 桁の数字で終わる) に一致する名前を持つすべてのトピックにサブスクライブされます。

3. 透かしを定義する(ウィンドウ)

Kafka コネクタの用途は、上記の単純なデータ抽出に限定されません。多くの場合、Kafka データに対してイベント時間のウィンドウ操作を実行することが想定されるため、Flink Kafka Source で Watermark を定義する必要があります。

イベント時間を定義するには、まず、Kafka データに時間属性を追加します。データが String#Long 形式 (test#1000 のみ) であると仮定します。次に、時間列として Long を使用します。

  • KafkaWithTsMsgSchema - 完全なコード

上記のKafkaデータ形式を解析するには、KafkaWithTsMsgSchemaなどのカスタムスキーマを開発して、String#LongをJava Tuple2に解析する必要があります。

  1. org.apache.flink.api.common.serialization.DeserializationSchema をインポートします。
  2. org.apache.flink.api.common.serialization.SerializationSchema をインポートします。
  3. org.apache.flink.api.common.typeinfo.BasicTypeInfo をインポートします。
  4. org.apache.flink.api.common.typeinfo.TypeInformation をインポートします。
  5. org.apache.flink.api.java.tuple.Tuple2 をインポートします。
  6. org.apache.flink.api.java.typeutils.TupleTypeInfo をインポートします。
  7. org.apache.flink.util.Preconditions をインポートします。
  8.  
  9. java.io.IOException をインポートします。
  10. java.io.ObjectInputStream をインポートします。
  11. java.io.ObjectOutputStream をインポートします。
  12. java.nio.charset.Charset をインポートします。
  13.  
  14. パブリック クラス KafkaWithTsMsgSchema は DeserializationSchema < Tuple2 < String , Long >> 、SerializationSchema < Tuple2 < String , Long >> を実装します
  15. プライベート静的最終long serialVersionUID = 1L ;
  16. プライベート一時文字セット charset;
  17.  
  18. パブリック KafkaWithTsMsgSchema() {
  19. Charset.forName("UTF-8") を使用します。
  20. }
  21.  
  22. パブリック KafkaWithTsMsgSchema(文字セット charset) {
  23. this.charset =前提条件.checkNotNull(charset);
  24. }
  25.  
  26. パブリック文字セット getCharset() {
  27. this.charset を返します。
  28. }
  29.  
  30. パブリック Tuple2 < String , Long >デシリアライズ(byte[] message) {
  31. 文字列msg =新しい文字列(メッセージ、文字セット);
  32. String[] dataAndTs = msg .split("#");
  33. if( dataAndTs.length == 2){
  34. return new Tuple2 < String , Long > (dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
  35. }それ以外{
  36. // 実際の運用では、実行時例外をスローする必要があります
  37. System.out.println("無効なメッセージ形式のため失敗しました。.["+msg+"]");
  38. 新しい Tuple2 < String , Long > (msg, 0L) を返します。
  39. }
  40. }
  41.  
  42. @オーバーライド
  43. パブリックブール値 isEndOfStream(Tuple2 < String , Long > stringLongTuple2) {
  44. false を返します。
  45. }
  46.  
  47. パブリックbyte[]シリアル化(Tuple2 < String , Long >要素) {
  48. "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset); を返します。
  49. }
  50.  
  51. プライベートvoid writeObject(ObjectOutputStream out)はIOExceptionをスローします{
  52. out.defaultWriteObject();
  53. out.writeUTF(this.charset.name());
  54. }
  55.  
  56. プライベート void readObject(ObjectInputStream in) は IOException、ClassNotFoundException をスローします {
  57. in.defaultReadObject();
  58. 文字列charsetName = .readUTF ();
  59. this.charset = Charset .forName(charsetName);
  60. }
  61.  
  62. @オーバーライド
  63. パブリックTypeInformation < Tuple2 < String , Long >> getProducedType () {
  64. 新しい TupleTypeInfo < Tuple2 < String , Long >> (BasicTypeInfo.STRING_TYPE_INFO、BasicTypeInfo.LONG_TYPE_INFO)返します。
  65. }}
  • 透かし生成

タイムスタンプを抽出して透かしを作成するには、カスタムの時間抽出および透かしジェネレーターを実装する必要があります。 Apache Flink には 2 つの方法があります。

  • AssignerWithPunctuatedWatermarks - 各レコードは透かしを生成します。
  • AssignerWithPeriodicWatermarks - 定期的にウォーターマークを生成します。

AssignerWithPunctuatedWatermarks を例にして、カスタム時間抽出およびウォーターマーク ジェネレーターを作成します。コードは次のとおりです。

  1. org.apache.flink.api.java.tuple.Tuple2 をインポートします。
  2. org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks をインポートします。
  3. org.apache.flink.streaming.api.watermark.Watermark をインポートします。
  4.  
  5. javax.annotation.Nullable をインポートします。
  6.  
  7. パブリッククラス KafkaAssignerWithPunctuatedWatermarks
  8. AssignerWithPunctuatedWatermarks < Tuple2 < String , Long >>を実装します{
  9. @Null可能
  10. @オーバーライド
  11. パブリックウォーターマーク checkAndGetNextWatermark(Tuple2 < String , Long > o, long l) {
  12. // 抽出したタイムスタンプを使用して透かしを作成する
  13. 新しいWatermark(l)を返します。
  14. }
  15.  
  16. @オーバーライド
  17. パブリック long extractTimestamp(Tuple2 < String , Long > o, long l) {
  18. // タイムスタンプを抽出
  19. o.f1 を返します。
  20. }}

メインプログラム - 完全プログラム

1 秒のサイズのタンブル ウィンドウを計算し、ウィンドウ内の最高値を計算します。完全な手順は次のとおりです。

  1. org.apache.flink.api.common.typeinfo.BasicTypeInfo をインポートします。
  2. org.apache.flink.api.common.typeinfo.TypeInformation をインポートします。
  3. org.apache.flink.api.java.tuple.Tuple2 をインポートします。
  4. org.apache.flink.api.java.typeutils.TupleTypeInfo をインポートします。
  5. org.apache.flink.api.java.utils.ParameterTool をインポートします。
  6. org.apache.flink.streaming.api.TimeCharacteristic をインポートします。
  7. org.apache.flink.streaming.api.datastream.DataStream をインポートします。
  8. org.apache.flink.streaming.api.environment.StreamExecutionEnvironment をインポートします。
  9. org.apache.flink.streaming.api.windowing.assigners.TumbleEventTimeWindows をインポートします。
  10. org.apache.flink.streaming.api.windowing.time.Time をインポートします。
  11. org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer をインポートします。
  12. org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer をインポートします。
  13. org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper をインポートします。
  14.  
  15. java.util.Properties をインポートします。
  16.  
  17. パブリッククラス KafkaWithEventTimeExample {
  18. パブリック静的void main(String[] args)は例外をスローします{
  19. // ユーザーパラメータを取得する
  20. 最終的な ParameterToolパラメータツール= ParameterTool .fromArgs(args);
  21. // ストリーム環境
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
  23. // イベント時間を設定する
  24. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  25.  
  26. // ソースのトピック
  27. 文字列sourceTopic = "flink-topic" ;
  28. //シンクのトピック
  29. 文字列sinkTopic = "flink-topic-output" ;
  30. // ブローカーアドレス
  31. 文字列ブローカー= "localhost:9092" ;
  32.  
  33. // 属性パラメータ - 実際の生産量はコマンドラインで渡すことができます
  34. プロパティp = parameterTool .getProperties();
  35. p.putAll(parameterTool.getProperties());
  36. p.put("bootstrap.servers", ブローカー);
  37.  
  38. env.getConfig().setGlobalJobParameters(parameterTool);
  39. // コンシューマーを作成する
  40. FlinkKafkaConsumerコンシューマー=新しいFlinkKafkaConsumer < Tuple2 < String Long >> (
  41. ソーストピック、
  42. 新しい KafkaWithTsMsgSchema()、
  43. ();
  44.  
  45. // Kafka メッセージを読み取る
  46. 型情報< Tuple2 <文字列、Long >>   typeInformation =新しいTupleTypeInfo < Tuple2 < String Long >> (
  47. BasicTypeInfo.STRING_TYPE_INFO、BasicTypeInfo.LONG_TYPE_INFO);
  48.  
  49. データストリーム< Tuple2 <文字列、Long >>  入力=環境 
  50. .addSource(消費者).returns(typeInformation)
  51. // タイムスタンプを抽出し、透かしを生成する
  52. .assignTimestampsAndWatermarks(新しい KafkaAssignerWithPunctuatedWatermarks());
  53.  
  54. // データ処理
  55. データストリーム< Tuple2 <文字列、Long >>  結果=入力 
  56. .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
  57. .max(0);
  58.  
  59. // プロデューサーを作成する
  60. FlinkKafkaProducerプロデューサー=新しいFlinkKafkaProducer < Tuple2 < String , Long >> (
  61. シンクトピック、
  62. 新しい KeyedSerializationSchemaWrapper < Tuple2 < String , Long >> (新しい KafkaWithTsMsgSchema())、
  63. p、
  64. FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
  65.  
  66. //Kafka の指定されたトピックにデータを書き込む
  67. 結果にシンクを追加します(プロデューサー)。
  68.  
  69. // ジョブを実行する
  70. env.execute("イベント時間の例を使用した Kafka");
  71. }}

テストは次のように実行されます。

簡単に説明すると、次の数字を入力します。

5000000 から 7000000 の間のデータを調べています。ここで、B#5000000、C#5000100、E#5000120 は同じウィンドウの内容です。 MAX 値を計算して文字列を比較します。 *** メッセージは出力 E#5000120 です。

4. Kafkaはタイムスタンプを搭載している

Kafka-0.10 以降では、メッセージにタイムスタンプを付けることができるため、msg にタイムスタンプとしてデータ列を追加する必要はありません。書き込みと読み取りの両方に Flink を使用すると、さらに簡単になります。一般的には、上記の例で十分です。

IV.まとめ

この記事では、Flink で Kafka を使用する方法に焦点を当てます。まず、Kafka の簡単なインストールと、メッセージの送受信のコマンドのデモンストレーションから始まります。次に、単純なデータ抽出とイベント時間ウィンドウの例を使用して、Apache Flink で Kafka を使用する方法を直感的に理解できるようにします。ここで紹介した内容がお役に立てれば幸いです!

いいねとコメントについて

この一連の記事には、必然的に多くの欠陥や欠点があります。読者の皆様には、実りある章には「いいね!」や励ましを、不十分な章にはフィードバックやご提案をいただければ幸いです。よろしくお願いします!

著者:孫金成(通称:金珠)は現在アリババで働いています。 2015年よりApache FlinkをベースとしたAlibabaのコンピューティングプラットフォームBlinkの設計・開発に携わる。

【この記事は51CTOコラムニスト「Jin Zhu」によるオリジナル記事です。転載については原著者にお問い合わせください。

この著者の他の記事を読むにはここをクリックしてください

<<:  Openstack Vlan モードでの分離とデータフロー

>>:  Kubernetes エコシステムの繁栄の背後にある長所と短所

推薦する

クリエイティブな写真編集、ロボットとの戦闘、AIがさらなるアプリケーション市場を牽引

[51CTO.comより引用] 現在、人工知能は医療、交通、製造、教育、金融など、人間生活のあらゆる...

PaaS+ローコード: クラウド コンピューティングの第 3 の波

人間社会は3つの段階に分けられます。最初の波は、約1万年前に始まった農業段階です。第二段階は17世紀...

サイバーセキュリティの自動化を再考する

「仕事をうまくやり遂げたいなら、まずツールを磨かなければなりません。」自動化技術は、INTE.NET...

医療現場への相談の持ち込み方!

前回の記事「医療業界の SEO が難しい理由」に続き、今回のブログ投稿では主に医療サイトの最適化方法...

国防総省はAWS、グーグル、オラクル、マイクロソフトの間で巨額の取引を分割した

国防総省は昨年マイクロソフトとの100億ドルのJEDI契約をキャンセルしたが、今度はクラウドコンピュ...

クラウドで今日の災害復旧のニーズを満たす方法

災害復旧 (DR) は、今日の組織の最高情報責任者にとって最優先事項となっています。実際、Enter...

外部リンク損失率が高い理由と解決策

SEO の最適化とプロモーションにおいて、外部リンクはウェブサイト全体の重みとキーワードのランキング...

マルチクラスタKubernetesの管理の課題への対応

[[442951]] 【51CTO.com クイック翻訳】著者: エミール・ヴォージュ翻訳:崔昊企画...

日々の話題:百度の「百記事」が注目を集める、CSRCの8%の利益率は違法との主張に反応

A5ウェブマスターネットワーク(www.admin5.com)は10月24日、百度が22日、「百度金...

競合他社の外部リンク分析のさまざまな側面

Baidu Webmaster Platformは、過去1年間に3回、ウェブサイトの外部リンクに対す...

英国ロンドンのネイティブ IP を備えた Hostyun の VPS (China Unicom AS9929) の簡単なレビュー

低価格のVPSブランドhostyunは最近、英国ロンドンのデータセンターに、英国固有のIPを持ち、中...

良いサービスを提供することがプロモーションの効果的な手段である理由についての簡単な分析

ウェブマスターの間で一般的に誰もが気にする問題は、ウェブサイトのランキング、含まれるアイテムの数、収...

ASO最適化ツールCicada Master ASOキーワードデータは毎分更新され、永久に無料です

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています仕事をうま...

巨人の喪失、なぜソフトウェアの巨人は衰退しているのか?

オラクルは、過去7年間に350億ドルを買収と新製品の発売に費やしたにもかかわらず、その期間の収益はわ...

hostodo: ラスベガス VPS、年間 39.99 ドル、KVM/3G メモリ/2 コア/30gNVMe/5T トラフィック

Hostodo は、コストパフォーマンスに優れた米国西海岸のラスベガスデータセンターの VPS を ...