Python で Apache Kafka を使いこなすために知っておくべき 3 つのライブラリ

Python で Apache Kafka を使いこなすために知っておくべき 3 つのライブラリ

データは世界に力を与えます。私たちは毎秒大量の情報を取得し、それを整理して分析し、ログ ファイル、ユーザー アクティビティ、チャット メッセージなど、より価値のある出力を作成します。より早くお届けできれば、お客様に提供する価値も高まります。私たちは、ペースが速く、環境が急速に変化する時代に生きています。

[[328122]]

Apache Kafka は、メッセージをリアルタイムで公開、サブスクライブ、保存、処理できる分散ストリーミング プラットフォームです。プルベースのアーキテクチャにより、高負荷時のサービスへの負担が軽減され、スケーリングが容易になります。大量のデータを低遅延で送信元から送信先に移動します。

プッシュアーキテクチャとプルアーキテクチャについて考える

最近、さまざまなサービス アーキテクチャの長所と短所について人々と議論しました。

Kafka は JVM ベースのプラットフォームであるため、クライアントの主流のプログラミング言語は Java です。しかし、コミュニティが活発化するにつれて、高品質のオープンソース Python クライアントが利用可能になり、本番環境で使用されるようになりました。

この記事では、最も有名な Python Kafka クライアントである kafka-python、pykafka、confluent-kafka を紹介し、比較します。最後に、各ライブラリの長所と短所について私の意見を述べます。

Kafka はなぜ必要なのでしょうか?

まず最初に。なぜカフカなのか? Kafka は、イベント駆動型アーキテクチャを強化するように設計されています。高スループット、低レイテンシ、高耐久性、高可用性のソリューションを提供することで、アーキテクチャを強化します。 (これらすべてを一度に実現できるわけではありません。常にトレードオフがあります。詳細については、このホワイトペーパーをお読みください。)

Kafka を導入して最適化し、高パフォーマンスと低レイテンシを実現する方法

Apache Kafka® は強力なストリーム処理プラットフォームであり、このホワイトペーパーでは、次のシナリオで Kafka のデプロイメントを最適化する方法について説明します。

高いパフォーマンスに加えて、送信者が受信者にメッセージを特に送信しないパブリッシュ/サブスクライブ モデルも魅力的な機能です。代わりに、メッセージは件名に基づいて、受信者が購読できる集中化された場所に配信されます。

この方法では、アプリケーションを簡単に分離し、モノリシック設計を排除できます。デカップリングがなぜ優れているのかを理解するために例を見てみましょう。

作成した Web サイトは、ユーザー アクティビティをどこかに送信する必要があるため、Web サイトからリアルタイム監視ダッシュボードへの直接接続を記述できます。これはうまく機能するシンプルな解決策です。ある日、将来の分析のためにユーザーアクティビティをデータベースに保存することにしました。そこで、Web サイトへの別の直接データベース接続を記述します。同時に、Web サイトのトラフィックはますます増加しており、アラート サービスやリアルタイム分析サービスなどを追加して強化したいと考えています。

スキーマは次のようになります。大規模なコード リポジトリ、セキュリティの問題、スケーラビリティの問題、保守性の問題などの問題は、あなたに悪影響を及ぼします。

> デカップリングのないアーキテクチャ(作成者:Xiaoxu Gao)

異なる役割を持つアプリケーションを分離するにはハブが必要です。イベントを作成するアプリケーションはプロデューサーと呼ばれます。イベントを集中ハブに公開します。各イベント (つまりメッセージ) はトピックに属します。消費者はハブの反対側にいます。プロデューサーと直接話をすることなく、ハブから必要なトピックを購読できます。

このモデルを使用すると、アーキテクチャを簡単に拡張および保守できます。エンジニアはコアビジネスにさらに集中できるようになります。

> デカップリングを備えたアーキテクチャ(作成者:Xiaoxu Gao)

Kafka のセットアップの概要

Apache Kafka は公式ウェブサイトからダウンロードできます。クイック スタートを使用すると、10 秒でサーバーを起動できます。

Confluent Platform から Apache Kafka をダウンロードすることもできます。これは、現時点で Kafka 向けの最大のストリーミング データ プラットフォームです。個人や企業がデータをリアルタイム ストリームとして配信できるように、Kafka を中心としたさまざまなインフラストラクチャ サービスを提供します。創設者は、Apache Kafka を最初に作成したチームの一員でした。

各 Kafka サーバーはブローカーと呼ばれ、スタンドアロン モードで実行することも、クラスターを形成することもできます。 Kafka に加えて、Kafka に関するメタデータを保存するための Zookeeper も必要です。 Zookeeper はコーディネーターのように機能し、分散システム内の各エージェントの状態を管理する役割を担います。

> Kafka のセットアップ (Xiaoxu Gao 作成)

すでに 1 つの Zookeeper と 1 つの Kafka ブローカーでインフラストラクチャがセットアップされていると仮定します。さあ、接続しましょう!オリジナルの Java クライアントは 5 つの API を提供します。

  • プロデューサー API: Kafka クラスター内のトピックにメッセージを公開します。
  • コンシューマー API: Kafka クラスター内のトピックからメッセージを消費します。
  • Streams API: トピックからのメッセージを消費し、Kafka クラスター内の他のトピックに変換します。これらの操作には、フィルタリング、結合、マッピング、グループ化などがあります。
  • 接続 API: コーディングなしで Kafka クラスターをソース システムまたはシンク システムに直接接続します。このシステムは、ファイル、リレーショナル データベース、Elasticsearch などになります。
  • 管理 API: Kafka クラスター内のトピックとブローカーを管理および検査します。

Kafka 用 Python ライブラリ

Python の世界では、5 つの API のうち 3 つ、つまり Producer API、Consumer API、Admin API が実装されています。 Python にはまだそのような Kafka Stream API はありませんが、良い代替手段は Faust です。

このセクションのテストは、1 つの Zookeeper と 1 つの Kafka ブローカーのローカル インストールに基づいて実行されました。これはパフォーマンスチューニングに関するものではないので、主にライブラリによって提供されるデフォルト構成を使用しています。

カフカPython

kafka-python は、多くの Python インターフェースを備え、公式 Java クライアントと非常によく似た機能を実現するように設計されています。 Kafka バージョン 0.9 以降で使用するのが最適です。初版は2014年3月に発行されました。現在も積極的にメンテナンスされています。

インストール

pip で kafka-python をインストールします

各メッセージは send() を介して非同期的に送信されます。呼び出されると、レコードをバッファに追加し、すぐに戻ります。これにより、プロデューサーはレコードをバッチで Kafka ブローカーに送信して効率を高めることができます。非同期により速度が大幅に向上しますが、理解しておくべきことがいくつかあります。

  • 非同期モードでは順序は保証されません。 Kafka ブローカーが各メッセージをいつ確認 (ack) するかを制御することはできません。
  • プロデューサーに成功と失敗のコールバックを提供するのは良い習慣です。たとえば、成功コールバックに情報ログ メッセージを書き込むことができ、失敗コールバックに例外ログ メッセージを書き込むことができます。
  • 順序は保証されないため、コールバックで例外が受信される前に追加のメッセージが送信される場合があります。

これらの問題を回避したい場合は、メッセージを同期的に送信することを選択できます。 send() の戻り値は FutureRecordMetadata です。 future.get(timeout=60) を実行すると、ブローカーがメッセージを正常に確認するまで、プロデューサーは最大 60 秒間ブロックされます。欠点は速度であり、非同期モードに比べて比較的遅いことです。

消費者

コンシューマー インスタンスは Python イテレータです。コンシューマー クラスの核となるのは poll() メソッドです。これにより、コンシューマーはトピックからメッセージを引き続き取得できるようになります。入力パラメータの 1 つである timeout_ms のデフォルトは 0 です。つまり、このメソッドは、取り出されてバッファー内で使用可能なすべてのレコードを直ちに返します。より大きなバッチを返すには、timeout_ms を増やすことができます。

デフォルトでは、各コンシューマーは無限のリスナーであるため、プログラムが中断されるまで実行が停止しません。しかし一方で、受信したメッセージに基づいてコンシューマーを停止することもできます。たとえば、特定のオフセットに達したときにループを終了し、コンシューマーを閉じることができます。

複数のトピックから 1 つのパーティションまたは複数のパーティションにコンシューマーを割り当てることもできます。

これは kafka-python ライブラリのテスト結果です。各メッセージのサイズは 100 バイトです。プロデューサーの平均スループットは 1.4MB/秒です。消費者の平均スループットは 2.8MB/秒でした。

合流カフカ

Confluent-kafka は、高性能 C クライアント librdkafka を活用する、Python 用の高性能 Kafka クライアントです。バージョン 1.0 以降、これらは PyPi 上の OS X および Linux 用のスタンドアロン バイナリ ホイールとして配布されます。 Kafka 0.8以降のバージョンをサポートしています。初版は2016年5月に発行されました。現在も積極的にメンテナンスされています。

インストール

OS X および Linux の場合、librdkafka はパッケージに含まれているため、別途インストールする必要があります。

pip で confluent-kafka をインストールします

Windows ユーザーの場合、この記事を書いた時点では、confluent-kafka はまだ Windows 上の Python 3.8 バイナリ ホイールをサポートしていません。 librdkafka で問題が発生します。リリースノートを確認してください。現在活発に開発中です。別の解決策は、Python 3.7 にダウングレードすることです。

Confluent-kafka は速度の点で驚異的なパフォーマンスを発揮します。 API の設計は kafka-python と多少似ています。ループ内に flush() を配置することで同期させることができます。

消費者

confluent-kafka の Consumer API には、より多くのコードが必要です。高レベルのループ メソッド (consume() など) を自分で処理する代わりに、while ループを自分で処理する必要があります。実際には Python ジェネレーターである独自の consump() を作成することをお勧めします。バッファ内に取得され利用可能なメッセージがある限り、そのメッセージが生成されます。

こうすることで、メイン関数がクリーンになり、コンシューマーの動作を自由に制御できるようになります。たとえば、consume() で「セッション ウィンドウ」を定義できます。 X 秒以内にメッセージが取得されない場合、コンシューマーは停止します。あるいは、入力パラメータとしてフラグ infinite=True を追加して、コンシューマーが無限リスナーであるかどうかを制御することもできます。

これはconfluent-kafkaライブラリのテスト結果です。各メッセージのサイズは 100 バイトです。平均プロデューサー スループットは 21.97MBps でした。コンシューマーの平均スループットは16.8〜28.7MB/秒です。

ピカフカ

PyKafka は、プログラマーに優しい Python 用の Kafka クライアントです。これには、Kafka プロデューサーとコンシューマーの Python 実装が含まれており、オプションで librdkafka に基づく C 拡張機能がサポートされています。 Kafka バージョン 0.82 以降をサポートしています。初版は2012年8月に発行されましたが、2018年11月以降更新されていません。

インストール

pip で pykafka をインストールします

パッケージには librdkafka は付属していないため、すべてのオペレーティング システムに個別にインストールする必要があります。

pykafka には、ProducerAPI と Consumer API の両方をカバーする KafkaClient インターフェースがあります。

メッセージは非同期モードと同期モードの両方で送信できます。 pykafka が一部のプロデューサー設定 (linger_ms や min_queued_messages など) のデフォルト値を変更し、少量のデータの送信に影響を与えることがわかりました。

これを Apache Kafka Web サイトのデフォルト設定と比較することができます。

各メッセージのコールバックを取得する場合は、必ず min_queued_messages を 1 に変更してください。そうしないと、データセットが 70000 未満の場合にレポートが取得されません。

>pykafkaプロデューサー設定

消費者

KafkaClinet インターフェースから SimpleConsumer を取得できます。これは、投票が SimpleConsumer クラスにラップされる kafka-python に似ています。

これは pykafka ライブラリのテスト結果です。各メッセージのサイズは 100 バイトです。プロデューサーの平均スループットは 2.1MB/秒です。消費者の平均スループットは 1.57MB/秒でした。

結論は

ここまで、各ライブラリの Producer API と Consumer API について説明しました。管理 API に関しては、kafka-python と confluent-kafka は明示的な管理 API を提供します。サブジェクトを作成し、次のテストを実行する前にそれを削除する単体テストでこれを使用できます。さらに、Python を使用して Kafka 監視ダッシュボードを構築する場合は、管理 API を使用してクラスターとトピックのメタデータを取得できます。

Confluent-kafka:

間違いなく、これら 3 つのライブラリの中では Confluent-kafka が最も優れたパフォーマンスを発揮します。 API は慎重に設計されており、パラメータは元の Apache Kafka と同じ名前とデフォルト値を持ちます。元のパラメータに簡単にリンクできます。個人的には、消費者の行動をカスタマイズできる柔軟性が気に入っています。 Confluent によっても積極的に開発およびサポートされています。

欠点は、Windows ユーザーがこれを動作させるのに時間がかかる可能性があることです。また、C 拡張機能のため、デバッグが難しくなる可能性があります。

カフカPython:

kafka-python は、C 拡張機能のない純粋な Python ライブラリです。 API は適切に設計されており、初心者でも簡単に使用できます。これも積極的に開発されているプロジェクトです。

python-kafka の欠点はその速度です。パフォーマンスを重視する場合は、代わりに confluent-kafka を使用することをお勧めします。

ピカフカ:

kafka-python や conflunet-kafka と比較すると、pykafka の開発活動は少ないです。バージョン履歴を見ると、2018 年 11 月以降更新されていないことがわかります。また、pykafka は API 設計が異なり、異なるデフォルト パラメータを使用していますが、これは初めてではない可能性があります。

<<:  仮想マシンは理解するのが難しいと誰が言ったのでしょうか?では、簡単に理解してみましょう(JVM)

>>:  コロナウイルスのパンデミック中およびパンデミック後のクラウドコストの最適化

推薦する

ウェブサイトのインタラクティブ性を高めてコンバージョン率を向上させる方法についての簡単な分析

今日の情報化時代において、オンライン マーケティングは目新しいものではありません。多くの伝統的な企業...

ikoula-23 ユーロ/I7-2600/16G メモリ/2T ハードドライブ/100M 無制限

フランスのサーバー販売業者 ikoula.com が、特別価格とセットアップ料金なしの 2 つのサー...

ChionCloud: 香港 + 米国のクラウド サーバー、CN2 GIA ネットワーク、60 元/月、1G メモリ/1 コア/30G ハードディスク/3M 帯域幅無制限

ChionCloudは2009年に設立され、香港に登録されています。香港とロサンゼルスのデータセンタ...

マルチクラウド ストレージ管理で避けるべき 5 つの間違い

不十分なセキュリティやベンダー ロックインなどの一般的な問題によって、組織がストレージに複数のクラウ...

SEOの考え方:前進しなければ遅れをとることになる

最近、福建省のSEOについて考えています。以前、自分自身に学ぶ機会を与えるだけでは感情的に十分ではな...

VMware、2022年度第3四半期の業績を発表

VMware は本日、2022 会計年度の第 3 四半期の業績を発表しました。第3四半期の総収益は3...

IaaSとPaaSの違い

[[442894]] 1. 分類から本質を見極める製品を分類することは、製品の本質を検討し理解するこ...

張大易から江坡まで、過去15年間の中国のネットセレブを振り返る!

フォーラムからWeibo、ライブ放送、ショートビデオまで、ソーシャル手段の絶え間ない更新は、中国のネ...

SEO 担当者にはどのようなスキルが必要ですか?

現在のネットワークで市場シェアを獲得するには、どのようなスキルが必要ですか? 現在のネットワークで市...

atcloud: 480G 超高防御 VPS 月額 4 ドルから、米国/シンガポールに 6 つのデータセンター、512m メモリ/1 コア/500g ハードディスク/無制限トラフィック

atcloudは、主に従来のクラウド(VPS)とストレージ(大容量ハードディスクストレージ)シリーズ...

WeChat パブリック プラットフォームは草の根ウェブマスター マーケティングに適していますか?

WeChatパブリックプラットフォームの立ち上げは、草の根起業家に朗報をもたらした。大手メディアが自...

2013 年のウェブサイト最適化の提案

ウェブサイト最適化担当者が降格や K 処分を経験したことがない場合は、優れたウェブサイト最適化担当者...

A5 フォーラム署名の消失について、ウェブマスターはどう考えていますか?

4月25日、百度のウェブマスターLeeが外部リンクの不正行為を判定する方法を発表した後、ウェブマスタ...

企業のキーワードランキングが安定した後のマーケティング方法

検索エンジン マーケティングは、オンライン マーケティングでは一般的です。検索エンジン マーケティン...

ウェブサイト構築の 6 つのステップ: 新しいウェブサイトの重量を改善するのは夢ではありません (パート 2)

前回の記事「ウェブサイト構築の6つのステップ:新規サイトの重量を改善するのは夢ではない(パート1)」...