データのバックログとデータの重複は、Kafka をメッセージング システムとして使用する場合によく発生する問題です。これらの問題を扱った例をいくつか示します。データバックログ処理:- コンシューマーの数を増やす: データのバックログが深刻な場合は、コンシューマー インスタンスの数を増やして消費速度を上げることができます。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer-group"); // 增加消费者数量props.put("max.poll.records", 500); // 每次拉取的最大记录数props.put("max.partition.fetch.bytes", 1048576); // 每次拉取的最大字节数KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息} } - コンシューマー グループのパーティション割り当て戦略を調整します。Kafka は、トピックのパーティションをコンシューマー グループ内のコンシューマー インスタンスに割り当てます。パーティション割り当て戦略を調整することで、各コンシューマー インスタンスがバランスの取れた数のパーティションを処理するようになり、全体的な消費容量が向上します。
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在重新分配分区之前,进行一些清理工作} @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在分配新的分区之后,进行一些初始化工作} }); - コンシューマー処理機能の向上: メッセージのバッチ処理、マルチスレッド、非同期処理などのコンシューマー ロジックを最適化して、コンシューマー処理速度を向上させます。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); List<SomeRecord> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { SomeRecord processedRecord = processRecord(record); batch.add(processedRecord); if (batch.size() >= 100) { // 批量处理消息saveBatchToDatabase(batch); batch.clear(); } } if (!batch.isEmpty()) { // 处理剩余的消息saveBatchToDatabase(batch); } } - Kafka クラスターを拡張する: Kafka ブローカー ノードとパーティションを追加して、全体的なメッセージ処理能力を向上させます。
データ重複処理:- メッセージの一意の識別子を使用する: プロデューサー側で各メッセージに一意の識別子を設定し、コンシューマーはメッセージを処理するときにその識別子に基づいてメッセージを重複排除できます。メッセージ内のフィールドを使用したり、メッセージの識別子としてグローバル一意識別子 (GUID) を生成したりできます。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String messageId = record.key(); if (!isMessageProcessed(messageId)) { // 处理消息processRecord(record); // 标记消息为已处理markMessageAsProcessed(messageId); } } } - トランザクションを使用する: メッセージの処理にデータ変更操作が含まれる場合は、Kafka のトランザクション機能を使用して、メッセージの冪等性と一貫性を確保できます。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer-group"); // 设置事务ID props.put("transactional.id", "kafka-transactional-id"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic")); consumer.beginTransaction(); try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息processRecord(record); } consumer.commitTransaction(); } catch (Exception e) { consumer.abortTransaction(); } - コンシューマー側での重複排除: データベースやキャッシュなどを使用して、コンシューマー側で処理されたメッセージの記録を保持します。メッセージを受信するたびに、まずレコードを照会します。レコードがすでに存在する場合は、メッセージを無視します。
Set<String> processedMessages = new HashSet<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String messageId = record.key(); if (!processedMessages.contains(messageId)) { // 处理消息processRecord(record); // 添加到已处理消息集合processedMessages.add(messageId); } } } - コンシューマー側でのべき等性処理: 重複したメッセージを受信しても最終的な処理結果が一貫していることを確認するために、コンシューマー側のビジネス ロジックにべき等性を実装します。
データ バックログとデータ重複の問題に対するソリューションは、特定のビジネス ニーズとシステム条件に基づいて調整および最適化する必要があります。さらに、監視および測定システムも非常に重要であり、データのバックログや重複の問題を迅速に検出して解決するのに役立ちます。 |