このガイドでは、Kubernetes 上で Kinesis Data Streams コンシューマーアプリケーションを自動的にスケーリングして、コストを節約し、リソース効率を向上させる方法について説明します。 コストを節約し、リソース効率を向上させるために、Kubernetes 上で Kinesis Data Streams コンシューマーアプリケーションを自動的にスケーリングする方法を知りたいですか?このブログでは、まさにそれを実行する方法について段階的に説明します。 Kubernetes を活用して Kinesis コンシューマー アプリケーションを自動的にスケーリングすることで、Horizontal Pod Autoscaler などの組み込み機能のメリットを享受できます。 Amazon Kinesis と Kinesis Data Streams とは何ですか? Amazon Kinesis は、リアルタイムのデータ処理、取り込み、分析のためのプラットフォームです。 Kinesis Data Streams は、サーバーレス ストリーミング データ サービスです (Kinesis Data Firehose、Kinesis Video Streams、Kinesis Data Analytics とともに、Kinesis ストリーミング データ プラットフォームの一部です)。 Kinesis Data Streams は、データ取り込み速度やストリーム消費速度の変化に柔軟に拡張し、継続的に適応できます。リアルタイム データ分析アプリケーション、リアルタイム ダッシュボード、リアルタイム データ パイプラインの構築に使用できます。 まず、Kinesis Data Streams の主要な概念の概要を説明します。 Kinesis Data Streams: 高レベルアーキテクチャ- Kinesis データ ストリームはシャードのセットです。各シャードには一連のデータ レコードがあります。
- プロデューサーは継続的にデータを Kinesis Data Streams にプッシュし、コンシューマーはデータをリアルタイムで処理します。
- パーティションキーは、ストリーム内のシャードごとにデータをグループ化するために使用されます。
- Kinesis Data Streams は、ストリームに属するデータ レコードを複数のシャードに分割します。
- 各データ レコードに関連付けられたパーティション キーを使用して、特定のデータ レコードがどのシャードに属しているかを判断します。
- コンシューマーは Amazon Kinesis Data Streams からレコードを取得して処理し、結果を Amazon DynamoDB、Amazon Redshift、Amazon S3 などに保存します。
- これらのコンシューマーは、Amazon Kinesis Data Streams アプリケーションとも呼ばれます。
- KDS データ ストリームからのデータを処理できるカスタム コンシューマー アプリケーションを開発する 1 つの方法は、Kinesis クライアント ライブラリ (KCL) を使用することです。
Kinesis コンシューマーアプリケーションはどのように水平方向にスケーリングしますか? Kinesis クライアント ライブラリは、各シャードに対してレコード プロセッサが実行され、そのシャードからのデータを処理することを保証します。 KCL は、分散コンピューティングとスケーラビリティに関連する多くの複雑なタスクを処理することで、Kinesis データ ストリームからのデータの消費と処理を支援します。データ ストリームに接続し、データ ストリーム内のシャードを列挙し、リースを使用してシャードとそのコンシューマー アプリケーションの関連付けを調整します。 レコード プロセッサは、管理するシャードごとにインスタンス化されます。 KCL はデータ ストリームからデータ レコードをプルし、対応するレコード プロセッサにレコードをプッシュし、処理されたレコードにチェックポイントを設定します。さらに重要なのは、ワーカー インスタンス数が変更されたとき、またはデータ ストリームが再シャーディングされたとき (シャードが分割または結合されたとき) に、シャードとワーカーの関連付け (リース) のバランスをとることです。つまり、KCL はインスタンス間のシャードのバランスを自動的に取るため、インスタンスを追加するだけで Kinesis Data Streams アプリケーションを拡張できます。 ただし、負荷が増加したときにアプリケーションを拡張する方法は依然として必要です。もちろん、これを手動で行うことも、カスタム ソリューションを構築して実行することもできます。 ここで、Kubernetes イベント駆動型自動スケーリング (KEDA) が役立ちます。これは、Kubernetes に基づくイベント駆動型の自動スケーリング コンポーネントであり、Kinesis などのイベント ソースを監視し、処理する必要があるイベントの数に基づいて基盤となるデプロイメント (および ) をスケーリングできます。ポッド 自動スケーリングの動作を確認するには、Kinesis クライアント ライブラリ (KCL) 2.x を使用して Kinesis Data Stream からデータを消費する Java アプリケーションを使用します。これは Amazon EKS 上の Kubernetes クラスターにデプロイされ、KEDA を使用します。アプリケーションには、Kinesis ストリームからのデータを処理し、DynamoDB テーブルに保存する ShardRecordProcessor の実装が含まれています。 AWS CLI を使用して Kinesis ストリームにデータを生成し、アプリケーションのスケールを監視します。 始める前に、KEDA について説明します。 Codaとは何ですか? KEDA は、ネイティブ Kubernetes プリミティブ (Horizontal Pod Autoscaler など) に基づいて構築され、任意の Kubernetes クラスターに追加できるオープンソースの CNCF プロジェクトです。以下に、主要なコンポーネントの概要を示します (詳細については、KEDA のドキュメントを参照してください)。 - keda-operator-metrics-apiserverコンポーネントKEDAはKubernetesメトリクスサーバーとして機能し、Horizontal Pod Autoscalerのメトリクスを公開します。
- KEDA Scaler は外部システム (Redis など) と統合してこれらのメトリック (リストの長さなど) を取得し、処理する必要があるイベントの数に基づいて Kubernetes 内の任意のコンテナの自動スケーリングを実行します。
- keda-operator コンポーネントの役割は、デプロイメントをアクティブ化および非アクティブ化することです。つまり、ゼロにスケールし、ゼロからやり直すということです。
AWS Kinesis Stream のシャード数に基づいてスケーリングする Kinesis Stream KEDA スケーラーの動作を確認します。 それでは、この記事の実践的な部分に移りましょう。 前提条件AWS アカウントに加えて、AWS CLI、kubectl、Docker、Java 11、Maven もインストールされている必要があります。 EKS クラスターを設定し、DynamoDB テーブルと Kinesis データ ストリームを作成します。 Amazon EKS クラスターはいくつかの方法で作成できます。私は、便利さの点で eksctl CLI を使用することを好みます。 eksctl を使用して EKS クラスターを作成するのは非常に簡単です。 eksctl create cluster --name <cluster name> --region <region eg us-east-1> 詳細については、「Amazon EKS の使用開始 - eksctl ドキュメント」を参照してください。 アプリケーションデータを保持するための DynamoDB テーブルを作成します。 AWS CLI を使用して、次のコマンドでテーブルを作成できます。 aws dynamodb create-table \ --table-name users \ --attribute-definitions AttributeName=email,AttributeType=S \ --key-schema AttributeName=email,KeyType=HASH \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 AWS CLI を使用して、2 つのシャードを持つ Kinesis ストリームを作成します。 aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2 この GitHub リポジトリをクローンし、正しいディレクトリに変更します。 git clone https://github.com/abhirockzz/kinesis-keda-autoscalingcd kinesis-keda-autoscaling さあ、始めましょう! EKS での KEDA のセットアップと構成このチュートリアルでは、YAML ファイルを使用して KEDA をデプロイします。ただし、Helm チャートを使用することもできます。 KEDAをインストールします: # update version 2.8.2 if requiredkubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml インストールを確認します。 # check Custom Resource Definitionskubectl get crd# check KEDA Deploymentskubectl get deployment -n keda# check KEDA operator logskubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsnotallow='{.items[0].metadata.name}' -n keda) -n keda IAMロールを設定するKEDA オペレーターと Kinesis コンシューマーアプリケーションは AWS API を呼び出す必要があります。どちらも EKS でデプロイメントとして実行されるため、必要な権限を付与するために IAM サービス アカウント ロール (IRSA) を使用します。 この特定のケースでは: - KEDA オペレーターは、Kinesis ストリームのシャード数を取得できる必要があります。これは、DescribeStreamSummaryAPI を使用して行われます。
- アプリケーション (具体的には KCL ライブラリ) は Kinesis および DynamoDB と対話する必要があります。そのためには、一連の IAM 権限が必要です。
KEDA オペレータ向けの IRSA の設定AWS アカウント ID と OIDC ID プロバイダーを環境変数として設定します。 ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)#update the cluster name and region as requiredexport EKS_CLUSTER_NAME=demo-eks-clusterexport AWS_REGION=us-east-1OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///") ロールの信頼できるエンティティを含む JSON ファイルを作成します。 read -r -d '' TRUST_RELATIONSHIP <<EOF{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}" }, "Action": "sts:AssumeRoleWithWebIdentity", "Condition": { "StringEquals": { "${OIDC_PROVIDER}:aud": "sts.amazonaws.com", "${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator" } } } ]}EOFecho "${TRUST_RELATIONSHIP}" > trust_keda.json 次に、IAM ロールを作成し、ポリシーをアタッチします (詳細については、policy_kinesis_keda.json ファイルを参照してください)。 export ROLE_NAME=keda-operator-kinesis-roleaws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.jsonaws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy IAM ロールとサービス アカウントを関連付けます。 kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}# verify the annotation kubectl describe serviceaccount/keda-operator -n keda 有効にするには、KEDA オペレーターのデプロイメントを再起動する必要があります。 kubectl rollout restart deployment.apps/keda-operator -n keda# to verify, confirm that the KEDA operator has the right environment variableskubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsnotallow={.items..metadata.name}) | grep "^\s*AWS_"# expected outputAWS_STS_REGIONAL_ENDPOINTS: regionalAWS_DEFAULT_REGION: us-east-1AWS_REGION: us-east-1AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-kinesis-roleAWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token KCL コンシューマー アプリケーション用の IRSA の構成まず、Kubernetes サービス アカウントを作成します。 kubectl apply -f - <<EOFapiVersion: v1kind: ServiceAccountmetadata: name: kcl-consumer-app-saEOF ロールの信頼できるエンティティを含む JSON ファイルを作成します。 read -r -d '' TRUST_RELATIONSHIP <<EOF{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}" }, "Action": "sts:AssumeRoleWithWebIdentity", "Condition": { "StringEquals": { "${OIDC_PROVIDER}:aud": "sts.amazonaws.com", "${OIDC_PROVIDER}:sub": "system:serviceaccount:default:kcl-consumer-app-sa" } } } ]}EOFecho "${TRUST_RELATIONSHIP}" > trust.json 次に、IAM ロールを作成し、ポリシーをアタッチします (詳細については、policy.json ファイルを参照してください)。 export ROLE_NAME=kcl-consumer-app-roleaws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.jsonaws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/kcl-consumer-app-policy IAM ロールとサービス アカウントを関連付けます。 kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}# verify the annotationkubectl describe serviceaccount/kcl-consumer-app-sa コア インフラストラクチャが準備完了です。コンシューマー アプリケーションを準備してデプロイしましょう。 KCLコンシューマーアプリケーションをEKSにデプロイするまず、Docker イメージを構築し、それを Amazon Elastic Container Registry (ECR) にプッシュする必要があります (詳細については Dockerfile を参照してください)。 DockerイメージをビルドしてECRにプッシュする# create runnable JAR filemvn clean compile assembly\:single# build docker imagedocker build -t kcl-consumer-app .AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)# create a private ECR repoaws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.comaws ecr create-repository --repository-name kcl-consumer-app --region us-east-1# tag and push the imagedocker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latestdocker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
コンシューマーアプリケーションをデプロイするECR にプッシュした Docker イメージを含めるように consumer.yaml を更新します。マニフェストの残りの部分は同じままです。 apiVersion: apps/v1kind: Deploymentmetadata: name: kcl-consumerspec: replicas: 1 selector: matchLabels: app: kcl-consumer template: metadata: labels: app: kcl-consumer spec: serviceAccountName: kcl-consumer-app-sa containers: - name: kcl-consumer image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest imagePullPolicy: Always env: - name: STREAM_NAME value: kinesis-keda-demo - name: TABLE_NAME value: users - name: APPLICATION_NAME value: kinesis-keda-demo - name: AWS_REGION value: us-east-1 - name: INSTANCE_NAME valueFrom: fieldRef: fieldPath: metadata.name デプロイメントを作成する: kubectl apply -f consumer.yaml# verify Pod transition to Running statekubectl get pods -w KEDA での KCL アプリケーション自動スケーリングの適用コンシューマー アプリケーションをデプロイしたので、KCL ライブラリが動作し始めるはずです。最初に、DynamoDB に「コントロール テーブル」を作成します。これは、KCL アプリケーションと同じ名前にする必要があります (この場合は、kinesis-keda-demo)。 最初の調整とテーブルの作成には数分かかる場合があります。コンシューマー アプリケーションのログをチェックして進行状況を追跡できます。 kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsnotallow={.items..metadata.name}) リースが割り当てられたら、テーブルをチェックして、leaseOwner プロパティを書き留めます。 aws dynamodb describe-table --table-name kinesis-keda-demoaws dynamodb scan --table-name kinesis-keda-demo ここで、AWS CLI を使用して Kinesis ストリームにデータを送信してみましょう。 export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user1", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user2", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user3", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user4", "city":"seattle"}' | base64) KCL アプリケーションは、各レコードをターゲット DynamoDB テーブル (この例では users) に保存します。表をチェックして記録を確認することができます。 aws dynamodb scan --table-name users processes_by 属性の値に注目してください。これは KCL コンシューマーと同じ Pod です。これにより、エンドツーエンドの自動スケーリング プロセスの検証が容易になります。 Kinesis 用の KEDA スケーラーの作成ScaledObject の定義は次のとおりです。これは kcl-consumer Deployment (先ほど作成したもの) を対象としており、shardCount が 1 に設定されていることに注意してください。 apiVersion: keda.sh/v1alpha1kind: ScaledObjectmetadata: name: aws-kinesis-stream-scaledobjectspec: scaleTargetRef: name: kcl-consumer triggers: - type: aws-kinesis-stream metadata: # Required streamName: kinesis-keda-demo # Required awsRegion: "us-east-1" shardCount: "1" identityOwner: "operator" KEDAKinesis スケーラーを作成します。 kubectl apply -f keda-kinesis-scaler.yaml KCLアプリケーションの自動スケーリングを確認するPod KCL アプリケーションの 1 つから始めます。しかし、KEDA のおかげで、2 番目のポッドが出現するはずです。 kubectl get pods -l=app=kcl-consumer -w# check logs of the new podkubectl logs -f <enter Pod name> Pods 定義で指定したため、アプリケーションは自動的に 2 にスケールできます。つまり、Kinesis ストリーム内の各シャードに 1 つずつ存在することになります。シャード数: "1"スケールオブジェクトポッド kinesis-keda-demo コントロール テーブル DynamoDB を確認します。leaseOwner が表示されるはずです。 Kinesis ストリームにさらにデータを送信してみましょう。 export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user5", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user6", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64) processes_by 属性の値を確認します。 2 つのポッドにスケールしたので、各ポッドは Kinesis ストリームからのレコードのサブセットを処理するため、値はレコードごとに異なる必要があります。 Kinesis ストリーム容量の増加シャードの数を 2 から 3 に増やし、KCL アプリケーションの自動スケーリングを監視し続けましょう。 aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING Kinesis の再シャーディングが完了すると、KEDA スケーラーが起動し、KCL アプリケーションを 3 つのポッドにスケーリングします。 kubectl get pods -l=app=kcl-consumer -w kinesis-keda-demo 前と同様に、コントロールテーブルで、Kinesis シャードリースが DynamoDB で更新されたことを確認します。 leasingOwnerプロパティを確認してください。 Kinesis ストリームにさらにデータを送信し続けます。予想どおり、Pod はレコード処理を共有し、それがユーザー テーブルの processes_by 属性に反映されます。 export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user9", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user10", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user11", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user12", "city":"seattle"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user14", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user15", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '{"name":"user16", "city":"seattle"}' | base64) ズームアウトこれまでのところ、私たちは一方向にのみ拡大してきました。 Kinesis ストリームのシャード容量を減らすと何が起こりますか?ぜひ自分で試してみてください。シャードの数を 3 から 2 に減らして、KCL アプリケーションに何が起こるかを確認してください。 エンドツーエンドのソリューションを検証した後は、追加料金が発生しないようにリソースをクリーンアップする必要があります。 リソースの削除EKS クラスター、Kinesis ストリーム、DynamoDB テーブルを削除します。 eksctl delete cluster --name keda-kinesis-demoaws kinesis delete-stream --stream-name kinesis-keda-demoaws dynamodb delete-table --table-name users 結論はこの記事では、KEDA を使用して、Kinesis ストリームからデータを消費する KCL アプリケーションを自動的にスケーリングする方法を学びました。 KEDA スケーラーは、アプリケーションの要件に応じて構成できます。たとえば、Kinesis ストリーム内の 3 つのシャードごとに shardCount を 3 に設定できます。ただし、1 対 1 のマッピングを維持する場合は、shardCount を 1 に設定すると、KCL が分散調整とリースの割り当てを処理し、Pod ごとにレコード プロセッサのインスタンスが 1 つあることが保証されます。これは、アプリケーションのニーズに合わせて Kinesis ストリーム処理パイプラインを拡張するための効果的な方法です。
|