Python で Apache Kafka を使いこなすために知っておくべき 3 つのライブラリ

Python で Apache Kafka を使いこなすために知っておくべき 3 つのライブラリ

データは世界に力を与えます。私たちは毎秒大量の情報を取得し、それを整理して分析し、ログ ファイル、ユーザー アクティビティ、チャット メッセージなど、より価値のある出力を作成します。より早くお届けできれば、お客様に提供する価値も高まります。私たちは、ペースが速く、環境が急速に変化する時代に生きています。

[[328122]]

Apache Kafka は、メッセージをリアルタイムで公開、サブスクライブ、保存、処理できる分散ストリーミング プラットフォームです。プルベースのアーキテクチャにより、高負荷時のサービスへの負担が軽減され、スケーリングが容易になります。大量のデータを低遅延で送信元から送信先に移動します。

プッシュアーキテクチャとプルアーキテクチャについて考える

最近、さまざまなサービス アーキテクチャの長所と短所について人々と議論しました。

Kafka は JVM ベースのプラットフォームであるため、クライアントの主流のプログラミング言語は Java です。しかし、コミュニティが活発化するにつれて、高品質のオープンソース Python クライアントが利用可能になり、本番環境で使用されるようになりました。

この記事では、最も有名な Python Kafka クライアントである kafka-python、pykafka、confluent-kafka を紹介し、比較します。最後に、各ライブラリの長所と短所について私の意見を述べます。

Kafka はなぜ必要なのでしょうか?

まず最初に。なぜカフカなのか? Kafka は、イベント駆動型アーキテクチャを強化するように設計されています。高スループット、低レイテンシ、高耐久性、高可用性のソリューションを提供することで、アーキテクチャを強化します。 (これらすべてを一度に実現できるわけではありません。常にトレードオフがあります。詳細については、このホワイトペーパーをお読みください。)

Kafka を導入して最適化し、高パフォーマンスと低レイテンシを実現する方法

Apache Kafka® は強力なストリーム処理プラットフォームであり、このホワイトペーパーでは、次のシナリオで Kafka のデプロイメントを最適化する方法について説明します。

高いパフォーマンスに加えて、送信者が受信者にメッセージを特に送信しないパブリッシュ/サブスクライブ モデルも魅力的な機能です。代わりに、メッセージは件名に基づいて、受信者が購読できる集中化された場所に配信されます。

この方法では、アプリケーションを簡単に分離し、モノリシック設計を排除できます。デカップリングがなぜ優れているのかを理解するために例を見てみましょう。

作成した Web サイトは、ユーザー アクティビティをどこかに送信する必要があるため、Web サイトからリアルタイム監視ダッシュボードへの直接接続を記述できます。これはうまく機能するシンプルな解決策です。ある日、将来の分析のためにユーザーアクティビティをデータベースに保存することにしました。そこで、Web サイトへの別の直接データベース接続を記述します。同時に、Web サイトのトラフィックはますます増加しており、アラート サービスやリアルタイム分析サービスなどを追加して強化したいと考えています。

スキーマは次のようになります。大規模なコード リポジトリ、セキュリティの問題、スケーラビリティの問題、保守性の問題などの問題は、あなたに悪影響を及ぼします。

> デカップリングのないアーキテクチャ(作成者:Xiaoxu Gao)

異なる役割を持つアプリケーションを分離するにはハブが必要です。イベントを作成するアプリケーションはプロデューサーと呼ばれます。イベントを集中ハブに公開します。各イベント (つまりメッセージ) はトピックに属します。消費者はハブの反対側にいます。プロデューサーと直接話をすることなく、ハブから必要なトピックを購読できます。

このモデルを使用すると、アーキテクチャを簡単に拡張および保守できます。エンジニアはコアビジネスにさらに集中できるようになります。

> デカップリングを備えたアーキテクチャ(作成者:Xiaoxu Gao)

Kafka のセットアップの概要

Apache Kafka は公式ウェブサイトからダウンロードできます。クイック スタートを使用すると、10 秒でサーバーを起動できます。

Confluent Platform から Apache Kafka をダウンロードすることもできます。これは、現時点で Kafka 向けの最大のストリーミング データ プラットフォームです。個人や企業がデータをリアルタイム ストリームとして配信できるように、Kafka を中心としたさまざまなインフラストラクチャ サービスを提供します。創設者は、Apache Kafka を最初に作成したチームの一員でした。

各 Kafka サーバーはブローカーと呼ばれ、スタンドアロン モードで実行することも、クラスターを形成することもできます。 Kafka に加えて、Kafka に関するメタデータを保存するための Zookeeper も必要です。 Zookeeper はコーディネーターのように機能し、分散システム内の各エージェントの状態を管理する役割を担います。

> Kafka のセットアップ (Xiaoxu Gao 作成)

すでに 1 つの Zookeeper と 1 つの Kafka ブローカーでインフラストラクチャがセットアップされていると仮定します。さあ、接続しましょう!オリジナルの Java クライアントは 5 つの API を提供します。

  • プロデューサー API: Kafka クラスター内のトピックにメッセージを公開します。
  • コンシューマー API: Kafka クラスター内のトピックからメッセージを消費します。
  • Streams API: トピックからのメッセージを消費し、Kafka クラスター内の他のトピックに変換します。これらの操作には、フィルタリング、結合、マッピング、グループ化などがあります。
  • 接続 API: コーディングなしで Kafka クラスターをソース システムまたはシンク システムに直接接続します。このシステムは、ファイル、リレーショナル データベース、Elasticsearch などになります。
  • 管理 API: Kafka クラスター内のトピックとブローカーを管理および検査します。

Kafka 用 Python ライブラリ

Python の世界では、5 つの API のうち 3 つ、つまり Producer API、Consumer API、Admin API が実装されています。 Python にはまだそのような Kafka Stream API はありませんが、良い代替手段は Faust です。

このセクションのテストは、1 つの Zookeeper と 1 つの Kafka ブローカーのローカル インストールに基づいて実行されました。これはパフォーマンスチューニングに関するものではないので、主にライブラリによって提供されるデフォルト構成を使用しています。

カフカPython

kafka-python は、多くの Python インターフェースを備え、公式 Java クライアントと非常によく似た機能を実現するように設計されています。 Kafka バージョン 0.9 以降で使用するのが最適です。初版は2014年3月に発行されました。現在も積極的にメンテナンスされています。

インストール

pip で kafka-python をインストールします

各メッセージは send() を介して非同期的に送信されます。呼び出されると、レコードをバッファに追加し、すぐに戻ります。これにより、プロデューサーはレコードをバッチで Kafka ブローカーに送信して効率を高めることができます。非同期により速度が大幅に向上しますが、理解しておくべきことがいくつかあります。

  • 非同期モードでは順序は保証されません。 Kafka ブローカーが各メッセージをいつ確認 (ack) するかを制御することはできません。
  • プロデューサーに成功と失敗のコールバックを提供するのは良い習慣です。たとえば、成功コールバックに情報ログ メッセージを書き込むことができ、失敗コールバックに例外ログ メッセージを書き込むことができます。
  • 順序は保証されないため、コールバックで例外が受信される前に追加のメッセージが送信される場合があります。

これらの問題を回避したい場合は、メッセージを同期的に送信することを選択できます。 send() の戻り値は FutureRecordMetadata です。 future.get(timeout=60) を実行すると、ブローカーがメッセージを正常に確認するまで、プロデューサーは最大 60 秒間ブロックされます。欠点は速度であり、非同期モードに比べて比較的遅いことです。

消費者

コンシューマー インスタンスは Python イテレータです。コンシューマー クラスの核となるのは poll() メソッドです。これにより、コンシューマーはトピックからメッセージを引き続き取得できるようになります。入力パラメータの 1 つである timeout_ms のデフォルトは 0 です。つまり、このメソッドは、取り出されてバッファー内で使用可能なすべてのレコードを直ちに返します。より大きなバッチを返すには、timeout_ms を増やすことができます。

デフォルトでは、各コンシューマーは無限のリスナーであるため、プログラムが中断されるまで実行が停止しません。しかし一方で、受信したメッセージに基づいてコンシューマーを停止することもできます。たとえば、特定のオフセットに達したときにループを終了し、コンシューマーを閉じることができます。

複数のトピックから 1 つのパーティションまたは複数のパーティションにコンシューマーを割り当てることもできます。

これは kafka-python ライブラリのテスト結果です。各メッセージのサイズは 100 バイトです。プロデューサーの平均スループットは 1.4MB/秒です。消費者の平均スループットは 2.8MB/秒でした。

合流カフカ

Confluent-kafka は、高性能 C クライアント librdkafka を活用する、Python 用の高性能 Kafka クライアントです。バージョン 1.0 以降、これらは PyPi 上の OS X および Linux 用のスタンドアロン バイナリ ホイールとして配布されます。 Kafka 0.8以降のバージョンをサポートしています。初版は2016年5月に発行されました。現在も積極的にメンテナンスされています。

インストール

OS X および Linux の場合、librdkafka はパッケージに含まれているため、別途インストールする必要があります。

pip で confluent-kafka をインストールします

Windows ユーザーの場合、この記事を書いた時点では、confluent-kafka はまだ Windows 上の Python 3.8 バイナリ ホイールをサポートしていません。 librdkafka で問題が発生します。リリースノートを確認してください。現在活発に開発中です。別の解決策は、Python 3.7 にダウングレードすることです。

Confluent-kafka は速度の点で驚異的なパフォーマンスを発揮します。 API の設計は kafka-python と多少似ています。ループ内に flush() を配置することで同期させることができます。

消費者

confluent-kafka の Consumer API には、より多くのコードが必要です。高レベルのループ メソッド (consume() など) を自分で処理する代わりに、while ループを自分で処理する必要があります。実際には Python ジェネレーターである独自の consump() を作成することをお勧めします。バッファ内に取得され利用可能なメッセージがある限り、そのメッセージが生成されます。

こうすることで、メイン関数がクリーンになり、コンシューマーの動作を自由に制御できるようになります。たとえば、consume() で「セッション ウィンドウ」を定義できます。 X 秒以内にメッセージが取得されない場合、コンシューマーは停止します。あるいは、入力パラメータとしてフラグ infinite=True を追加して、コンシューマーが無限リスナーであるかどうかを制御することもできます。

これはconfluent-kafkaライブラリのテスト結果です。各メッセージのサイズは 100 バイトです。平均プロデューサー スループットは 21.97MBps でした。コンシューマーの平均スループットは16.8〜28.7MB/秒です。

ピカフカ

PyKafka は、プログラマーに優しい Python 用の Kafka クライアントです。これには、Kafka プロデューサーとコンシューマーの Python 実装が含まれており、オプションで librdkafka に基づく C 拡張機能がサポートされています。 Kafka バージョン 0.82 以降をサポートしています。初版は2012年8月に発行されましたが、2018年11月以降更新されていません。

インストール

pip で pykafka をインストールします

パッケージには librdkafka は付属していないため、すべてのオペレーティング システムに個別にインストールする必要があります。

pykafka には、ProducerAPI と Consumer API の両方をカバーする KafkaClient インターフェースがあります。

メッセージは非同期モードと同期モードの両方で送信できます。 pykafka が一部のプロデューサー設定 (linger_ms や min_queued_messages など) のデフォルト値を変更し、少量のデータの送信に影響を与えることがわかりました。

これを Apache Kafka Web サイトのデフォルト設定と比較することができます。

各メッセージのコールバックを取得する場合は、必ず min_queued_messages を 1 に変更してください。そうしないと、データセットが 70000 未満の場合にレポートが取得されません。

>pykafkaプロデューサー設定

消費者

KafkaClinet インターフェースから SimpleConsumer を取得できます。これは、投票が SimpleConsumer クラスにラップされる kafka-python に似ています。

これは pykafka ライブラリのテスト結果です。各メッセージのサイズは 100 バイトです。プロデューサーの平均スループットは 2.1MB/秒です。消費者の平均スループットは 1.57MB/秒でした。

結論は

ここまで、各ライブラリの Producer API と Consumer API について説明しました。管理 API に関しては、kafka-python と confluent-kafka は明示的な管理 API を提供します。サブジェクトを作成し、次のテストを実行する前にそれを削除する単体テストでこれを使用できます。さらに、Python を使用して Kafka 監視ダッシュボードを構築する場合は、管理 API を使用してクラスターとトピックのメタデータを取得できます。

Confluent-kafka:

間違いなく、これら 3 つのライブラリの中では Confluent-kafka が最も優れたパフォーマンスを発揮します。 API は慎重に設計されており、パラメータは元の Apache Kafka と同じ名前とデフォルト値を持ちます。元のパラメータに簡単にリンクできます。個人的には、消費者の行動をカスタマイズできる柔軟性が気に入っています。 Confluent によっても積極的に開発およびサポートされています。

欠点は、Windows ユーザーがこれを動作させるのに時間がかかる可能性があることです。また、C 拡張機能のため、デバッグが難しくなる可能性があります。

カフカPython:

kafka-python は、C 拡張機能のない純粋な Python ライブラリです。 API は適切に設計されており、初心者でも簡単に使用できます。これも積極的に開発されているプロジェクトです。

python-kafka の欠点はその速度です。パフォーマンスを重視する場合は、代わりに confluent-kafka を使用することをお勧めします。

ピカフカ:

kafka-python や conflunet-kafka と比較すると、pykafka の開発活動は少ないです。バージョン履歴を見ると、2018 年 11 月以降更新されていないことがわかります。また、pykafka は API 設計が異なり、異なるデフォルト パラメータを使用していますが、これは初めてではない可能性があります。

<<:  仮想マシンは理解するのが難しいと誰が言ったのでしょうか?では、簡単に理解してみましょう(JVM)

>>:  コロナウイルスのパンデミック中およびパンデミック後のクラウドコストの最適化

推薦する

電子商取引がいかにして野蛮な成長に別れを告げるか:繁栄したシーンでも生き残りの困難は隠せない

李静「ダブル11」ショッピングカーニバルは、中国の電子商取引に新たな狂乱を巻き起こした。オンライン銀...

企業サイト制作の難しさと簡単さ

私はウェブサイト開発会社で1年以上働いています。時間が経つのは本当に早いですね。学校を卒業したばかり...

「タオバオ外」の電子商取引代理店事業はJD.comのオープンプラットフォームを後押しする可能性

1月22日、易邦電力網は、タオバオ・天猫プラットフォームの半分を占めるサービスプロバイダーのグループ...

desivps: 新年のプロモーション、VPS は年間 20 ドルから、専用サーバーは月額 55 ドルから、ロサンゼルス/オランダのデータセンター

Desivpsの新年プロモーションVPSイベントが始まり、米国ロサンゼルスとオランダのハーグにあるデ...

外部リンクの作成方法 - SEO担当者の経験談

ウェブサイトの最適化は、内部最適化と外部最適化の 2 つの方法の組み合わせに他なりません。ウェブサイ...

raidlogic - 9.9 ドル/年/64MB メモリ/128MB バースト/5GB ハードディスク/500GB トラフィック

raidlogic、あのひどいウェブサイトが改良されて、やっと見栄えがよくなりました。2001 年に...

SEO で成功するための 3 つのステップ: 盗作、疑似独創性、独創性

おそらくほとんどのウェブマスターは、このタイトルを見ると、この記事の内容はウェブサイトのコンテンツに...

Docker: 人々が後悔する賭け

【編集後記】Docker には利点もありますが、その裏には無理な設計も数多く存在します。この記事の目...

henghost:「618イベント」30%割引、cn2 giaネットワーク、香港+米国データセンター、OpenStackクラウドサーバー+独立サーバー、高防御

恒創科技618の活動:(1)香港クラウドサーバー、米国クラウドサーバー、CN2 GIAネットワーク、...

ブログの目的を理解していますか?

私はAlibabaでブログを開設し、5年間ブログを続けています。正直に言うと、5年間ブログを続けるの...

方周子は広東テレビの司会者を訴え、裁判所から賠償金を受け取った。一部の侵害は侵害された。

方周子さんは、広東省のテレビ司会者、王牧迪さんが微博で自分を中傷し侮辱したと信じ、名誉権侵害で王牧迪...

ヤオ・ジンボ:58.comはまだ初期段階にあり、来年には監督者と管理者の10%を削減する予定です。

原題:ヤオ・ジンボ:58.comはまだ初期段階にあり、気を緩める余裕はありません。来年中に監督者と管...

ウェブマスターネットワークニュース:海賊版映画やテレビ番組のウェブマスターが巨額の利益を得る時代は終わり、百度と小米がチーターモバイルに投資

1. 海賊版映画やテレビ番組のウェブマスターが巨額の利益を上げていた時代は終わり、トラフィックは大幅...

SpringBoot 分散トランザクションのベスト エフォート通知

[[393657]]環境: springboot.2.4.9 + RabbitMQ3.7.4ベストエ...

リベートサイトは廃れており、ねずみ講を調査して対処するのは困難です

噂によると、払い戻しのウェブサイトが修正され、ATMの前に長い列ができていたという。ウェブマスターネ...