Flink や Spark に代表される分散ストリーム バッチ コンピューティング フレームワークの基盤となるリソース管理プラットフォームは、Hadoop エコシステムの YARN から Kubernetes エコシステムの k8s ネイティブ スケジューラや、Volcano や Yunikorn などの周辺リソース スケジューラへと徐々に移行してきました。この記事では、ネイティブ Kubernetes のサポートと実装の面での 2 つのコンピューティング フレームワークの類似点と相違点、およびそれらを本番環境に適用するために必要な作業を簡単に比較します。 1. ネイティブとは何か ここでのネイティブとは、実際にはコンピューティング フレームワークが Kubernetes のリソースに直接適用されることを意味します。たとえば、YARN 上で実行される多くのコンピューティング フレームワークでは、YARN の ResourceManager からリソースを適用するために AppMaster を実装する必要があります。ネイティブ K8s は、k8s からリソースを適用するために AppMaster と同様のロールを実装するコンピューティング フレームワークと同等です。もちろん、AppMaster との間にはまだ違いがあります (AppMaster は YARN 標準に従って実装する必要があります)。 2. k8sでSparkを使用する
課題の提出 k8s クラスターにジョブを送信する方法は、YARN に送信する方法と非常に似ています。コマンドは以下のとおりです。主な違いは次のとおりです。 --masterパラメータはk8sクラスタのApiServerを指定します パラメーター spark.kubernetes.container.image を使用して、k8s でジョブを実行するためのイメージを指定する必要があります。 ドライバー プロセスからアクセスできる必要があるメイン jar を指定します。ドライバーがポッドで実行される場合は、jar パッケージをイメージに含める必要があります。ドライバーがローカルで実行される場合、jar もローカルである必要があります。 --name または spark.app.name を使用してアプリ名を指定します。ジョブの実行後のドライバー名の前にはアプリ名が付きます。もちろん、パラメータ spark.kubernetes.driver.pod.name を通じてドライバーの名前を直接指定することもできます。 - $ ./bin/spark-submit \ --master k8s:
コマンドを送信すると、spark-submit はドライバー ポッドと対応するサービスを作成し、その後ドライバーはエグゼキューター ポッドを作成してジョブを実行します。 デプロイモード YARN で Spark を使用する場合と同様に、k8s もクラスター モードとクライアント モードをサポートしています。 クラスター モード:ドライバーは k8s クラスター上のポッドとして実行されます。 クライアント モード:ドライバーはジョブが送信された場所で実行され、その後、ドライバーは k8s クラスター上にエグゼキューターを作成します。エグゼキュータがドライバーに登録できるようにするには、ジョブを送信するマシンが k8s クラスター内のエグゼキュータ ネットワークに接続されている必要があります (エグゼキュータはドライバーにアクセスでき、登録する必要があります)。 リソースのクリーンアップ ここでのリソースは主に、ジョブのドライバー ポッドとエグゼキューター ポッドを指します。 Spark は、k8s のホスト参照メカニズムを介してジョブのさまざまなリソースを接続するため、ドライバー ポッドが削除されると、関連するエグゼキューター ポッドも削除されます。ただし、ドライバー ポッドがない場合、つまりジョブがクライアント モードで実行される場合、次の 2 つの状況でリソースのクリーンアップが発生します。 ジョブが完了し、ドライバー プロセスが終了し、実行ポッドは実行後に自動的に終了します。 ドライバー プロセスが強制終了されると、エグゼキューター ポッドはドライバーに接続できなくなり、自動的に終了します。詳細については、https://kubernetes.io/docs/concepts/architecture/garbage-collection/ を参照してください。 依存関係の管理 前述したように、メインの jar パッケージはドライバー プロセスからアクセスできる必要があります。クラスター モードの場合は、メイン jar を Spark イメージにパッケージ化する必要があります。しかし、日々の開発やデバッグにおいて、その都度イメージを再構築する労力は非常に大きいです。 Spark は、送信時にローカル ファイルを使用し、転送として S3 を使用することをサポートしています。つまり、最初にアップロードし、ジョブの実行時に S3 からダウンロードします。以下に例を示します。 - ...--パッケージ org.apache.hadoop:hadoop-aws: 3.2 。 0 --conf spark.kubernetes.file.upload.path=s3a:
ポッドテンプレート k8s コントローラー (デプロイメントやジョブなど) がポッドを作成するときは、仕様内のポッド テンプレートに従ってポッドを作成します。以下はジョブの例です。 - apiVersion: batch/v1kind: Jobmetadata: name: hellospec: template: # 以下はポッド テンプレートの仕様です: containers: - name: hello image: busybox command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600'] restartPolicy: OnFailure # ポッドテンプレートはここで終了します
spark-submit を通じて Spark ジョブを送信すると、最終的な k8s リソース (ドライバー/エグゼキューター ポッド) が Spark の内部ロジックによって構築されます。しかし、ログ収集を行うためにサイドカー コンテナーを追加するなど、ドライバー/エグゼキューター ポッドで追加の作業を実行したい場合もあります。このシナリオでは、PodTemplate がより適切な選択です。同時に、PodTemplate は Spark を基盤となるインフラストラクチャ (k8s) から切り離します。たとえば、k8s がいくつかの新機能をサポートするために新しいバージョンをリリースした場合、Spark の内部変更を伴わずに PodTemplate を変更するだけで済みます。 RBAC RBAC はロールベースのアクセス制御の略で、k8s の権限制御メカニズムです。簡単に言えば: RBACには、ポッドの作成/削除/監視/一覧表示などの一連の権限設定が含まれます。これらの権限セットのエンティティは、ロールまたはClusterRoleと呼ばれます。 同時に、RBAC にはロール バインディングも含まれており、これはサービス アカウントや UserAccount などの 1 人またはグループのユーザーに Role/ClusterRole を割り当てるために使用されます。 k8s クラスターで Spark ジョブを実行するには、RBAC リソースのセットも必要です。 名前空間の下のサービスアカウントを指定します 権限ルールを定義するロールまたは ClusterRole。共通の ClusterRole「edit」(作成/削除/監視など、ほぼすべてのリソースに対する操作権限を持つ)を使用できます。 次のコマンドは、サービス アカウント spark に、spark 名前空間内の他のリソースを操作する権限を付与します。その後、Spark のドライバー ポッドがサービス アカウントをマウントしている限り、エグゼキューター ポッドを作成できます。 - $ kubectl サービスアカウント spark を作成します $ kubectl クラスターロールバインディング spark-role を作成します--clusterrole = edit --serviceaccount = spark :spark --namespace = spark
簡単なデモンストレーションを以下に示します。 次のコマンドを使用して、SparkPiSleep ジョブを k8s クラスターに送信します。 - SparkクラスターにSparkクラスター1を追加すると、クラスターのメモリとドライバーが適切に割り当てらます。local:///path/to/main/jar
k8s クラスター内のリソースを表示する - $ kubectl get po -n sparkNAME READY STATUS RESTARTS AGEspark-pi-5b88a27b576050dd-exec-1 0/1 ContainerCreating 0 2stest12-9fd3c27b576039ae-driver 1/1 Running 0 8s
最初のものはエグゼキュータ ポッドで、2 番目のものはドライバ ポッドです。さらに、Spark UI など、ドライバー ポッドにアクセスできるサービスが作成されます。 - $ kubectl get svc -n sparkNAME タイプ クラスターIP 外部IP ポート 年齢test12-9fd3c27b576039ae-driver-svc クラスターIP なし 7078/TCP、7079/TCP、4040/TCP 110 秒
サービス所有者リファレンスを見てみましょう。実行者ポッドも同様です。 - $ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyamlapiVersion: v1kind: Servicemetadata: creationTimestamp: "2021-08-18T03:48:50Z" name: test12-9fd3c27b576039ae-driver-svc namespace: spark # サービスの ownerReference はドライバー ポッドを指します。ドライバー ポッドが削除されると、サービスも削除されます。所有者参照: - apiVersion: v1 コントローラー: true 種類: ポッド名: test12-9fd3c27b576039ae-driver uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95 リソース バージョン: "9975441" uid: 06c1349f-be52-4133-80d9-07af34419b1f
3. k8sでFlinkを使用する k8s ネイティブでの Flink の実装では、次の 2 つのモードがサポートされています。 アプリケーション モード: リモート k8s クラスターで Flink クラスター (jm および tm) を起動すると、ドライバーは jm で実行されます。つまり、アタッチ モードではなく、デタッチ モードのみがサポートされます。 セッション モード: リモート k8s クラスターで永続的な Flink クラスター (JM のみ) を起動し、それにジョブを送信して、実際の状況に基づいて起動する TM の数を決定します。 通常、本番環境でセッション モードを使用することは推奨されないため、以下では主にアプリケーション モードについて説明します。 Flink のネイティブ k8s モードでは、tm の数を指定する必要はありません。 jm はユーザーのコードに基づいて必要な tm の数を計算します。 課題の提出 以下は、次の内容を含める必要がある単純な送信コマンドです。 パラメータ run-application はアプリケーション モードを指定します。パラメーター --target は、k8s 上での実行を指定します。パラメーター kubernetes.container.image は、ジョブに使用される flink イメージを指定します。最後に、メインの jar を指定する必要があります。パスはイメージ内のパスです。 - $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id = my -first-application-cluster \ -Dkubernetes.container.image = custom -image-name \ local:///opt/flink/usrlib/my-flink-job.jar
リソースのクリーンアップ Flink のネイティブ モードでは、最初に JobManager デプロイメントを作成し、それを k8s にホストします。同じジョブのすべての関連リソースの所有者参照はデプロイメントを指します。つまり、デプロイメントが削除されると、関連するすべてのリソースがクリーンアップされます。以下では、ジョブの実行ステータスに基づいてリソースをクリーンアップする方法について説明します。 ジョブが最終状態 (SUCCESS、FAILED、CANCELED など) まで実行された後、Flink はすべてのジョブをクリーンアップします。 JobManager プロセスの起動に失敗しました (ポッド内の jm コンテナの起動に失敗しました)。コントローラーはデプロイメントであるため、繰り返しプルアップされ実行されます。 JobManager ポッドが削除されると、デプロイメントがプルアップされ、再度実行されます。 JobManager デプロイメントが削除されると、関連付けられているすべての k8s リソースも削除されます。 ポッドテンプレート Flink ネイティブ モードでは、Spark と同様に Pod テンプレートもサポートされます。 RBAC Sparkに似ています。 依存ファイルの管理 Flink は現在、イメージ内のメイン jar と依存ファイルのみをサポートしています。つまり、ユーザーはジョブを送信するために独自の画像をカスタマイズする必要があり、これはあまり良いエクスペリエンスとは言えません。回避策の 1 つは、これを PodTemplate と組み合わせることです。 依存関係がローカル ファイルである場合は、主要なクラウド ベンダーのオブジェクト ストレージなどのリモート ストレージにアップロードして転送する必要があります。 依存関係がリモート ファイルの場合、アップロードは必要ありません。 実行時に、テンプレートで initContainer が使用され、ユーザーの jar および依存関係ファイルが Flink コンテナーにダウンロードされ、実行のためにクラスパスに追加されます。 Flink のジョブデモはここでは説明しません。 4. Kubernetes 上の Spark の実装 Kubernetes での Spark の実装は比較的簡単です。 Sparkクライアントはドライバーを実行するためのk8sポッドを作成します ドライバーはエグゼキューター ポッドを作成し、ジョブの実行を開始します。ジョブが完了すると、ドライバー ポッドは完了状態になり、エグゼキューター ポッドはクリーンアップされます。ジョブが完了した後も、ドライバー ポッドを通じてドライバー ポッドを表示できます。 コードの実装 Spark のネイティブ k8s 実装コードは、resource-managers/kubernetes モジュールにあります。 SparkSubmit コードから分析を開始できます。主に、deploy-mode のクラスター モードのコード ロジックを見ていきます。 - // クラスター マネージャーを設定します val clusterManager: Int = args .master match { case "yarn" = > YARN case m if m.startsWith("spark") = > STANDALONE case m if m.startsWith("mesos") = > MESOS case m if m.startsWith("k8s") = > KUBERNETES case m if m.startsWith("local") = > LOCAL case _ = > error("Master must any be yarn or start with spark, mesos, k8s, or local") -1 }
まず、spark.master 構成のスキームに基づいて、k8s 上にあるかどうかを判断します。また、この構成は --master k8s://https://: の形式であることも上で確認しました。 k8s クラスター モードの場合は、クラス org.apache.spark.deploy.k8s.submit.KubernetesClientApplication をロードし、start メソッドを実行します。 childArgs メソッドのコアロジックは、spark-submit によって送信されたパラメータに基づいてドライバーポッドを構築し、実行のために k8s に送信することです。 - private[spark] class KubernetesClientApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val parsedArguments = ClientArguments .fromCommandLineArgs(args) run(parsedArguments, conf) } private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { // アプリ ID を構築する場合、Spark アプリケーション名は使用できません。アプリ ID は、同じアプリケーションに属するリソースをグループ化するためのラベルとして追加されるためです。ラベル値には // かなり制限があり、たとえば長さは 63 文字を超えてはなりません。したがって、以下の形式で、一意のアプリ ID (spark.app.id によってキャプチャ) を生成します。 val kubernetesAppId = KubernetesConf .getKubernetesAppId() val kubernetesConf = KubernetesConf .createDriverConf( sparkConf, kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, clientArguments.proxyUser) // マスター URL の有効性は SparkSubmit ですでにチェックされています。 // ここでは「k8s://」プレフィックスを削除するだけです。 val master = KubernetesUtils .parseMasterUrl(sparkConf.get("spark.master")) val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, SparkKubernetesClientFactory.ClientType.Submission, sparkConf, None, None)) { kubernetesClient = > val client = new Client( kubernetesConf, new KubernetesDriverBuilder(), kubernetesClient, watcher) client.run() } }}
上記のコードの中核は、最後にクライアントを作成して実行することです。このクライアントは、k8s クライアントが組み込まれた Spark カプセル化クライアントです。 - private[spark] class Client( conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, kubernetesClient: KubernetesClient, watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { // ドライバーのポッドを構築 valresolvedDriverSpec = builder .buildFromFeatures (conf, kubernetesClient) valconfigMapName = KubernetesClientUtils .configMapNameDriver valconfFilesMap = KubernetesClientUtils .buildSparkConfDirFilesMap(configMapName, conf.sparkConf,resolvedDriverSpec.systemProperties) valconfigMap = KubernetesClientUtils .buildConfigMap(configMapName, confFilesMap) // ポッドのコンテナ仕様を変更: SPARK_CONF_DIR を追加 valresolvedDriverContainer = new ContainerBuilder (resolvedDriverSpec.pod.container) .addNewEnv() .withName(ENV_SPARK_CONF_DIR) .withValue(SPARK_CONF_DIR_INTERNAL) .endEnv() .addNewVolumeMount() .withName(SPARK_CONF_VOLUME_DRIVER) .withMountPath(SPARK_CONF_DIR_INTERNAL) .endVolumeMount() .build() valresolvedDriverPod = new PodBuilder (resolvedDriverSpec.pod.pod) .editSpec() .addToContainers(resolvedDriverContainer) .addNewVolume() .withName(SPARK_CONF_VOLUME_DRIVER) .withNewConfigMap() .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava) .withName(configMapName) .endConfigMap() .endVolume() .endSpec() .build() valdriverPodName = resolvedDriverPod .getMetadata.getName var watch: Watch = null var createdDriverPod: Pod = null try { // k8s クライアントを介して Driver Pod を作成しますcreatedDriverPod = kubernetesClient .pods().create(resolvedDriverPod) } catch { case NonFatal(e) = > logError("まず \"kubectl auth can-i create pod\" を確認してください。 yes のはずです。") throw e } try { // 他のリソースの作成、所有者参照の変更など。 val otherKubernetesResources = resolvedDriverSpec .driverKubernetesResources ++ Seq(configMap) addOwnerReference(createdDriverPod, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) = > kubernetesClient.pods().delete(createdDriverPod) throw e } val sId = Seq (conf.namespace, driverPodName).mkString(":") // ポッドを監視 breakable { while (true) { val podWithName = kubernetesClient .pods() .withName(driverPodName) // 監視を開始する前にリソースを古い状態にリセットします。これは競合状態にとって重要です。 watcher.reset() watch = podWithName .watch(watcher) // 最新のポッド状態を送信しますウォッチャーに、何も見逃していないことを確認するよう通知します。watcher.eventReceived(Action.MODIFIED, podWithName.get()) // ポッドが完了したか、待機したくない場合は、while ループを中断します。// パラメーター "spark.kubernetes.submission.waitAppCompletion" に基づいて終了するかどうかを決定します。if(watcher.watchOrStop(sId)) { watch.close() break } } } }
以下は、ドライバーがエグゼキューター プロセスを管理する方法の簡単な紹介です。 Spark ドライバーがメイン関数を実行すると、SparkContext を含む SparkSession が作成されます。 SparkContext は、Executor のライフサイクルを管理するために SchedulerBackend を作成する必要があります。 k8s に対応する SchedulerBackend は、実際には KubernetesClusterSchedulerBackend です。以下では、主にこのバックエンドがどのように作成されるかを見ていきます。大胆な推測ですが、これはおそらく spark.master URL のスキーム「k8s」に基づいて作成されていると思われます。 以下は、SchedulerBackend を作成する SparkContext のコア コード ロジックです。 - private def createTaskScheduler(...) = { case masterUrl = > // KubernetesClusterManager を作成します val cm = getClusterManager (masterUrl) match { case Some(clusterMgr) = > clusterMgr case None = > throw new SparkException("マスター URL を解析できませんでした: '" + master + "'") } try { val scheduler = cm .createTaskScheduler(sc, masterUrl) // 上記で作成した KubernetesClusterManager は KubernetesClusterSchedulerBackend を作成します val backend = cm .createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException = > throw se case NonFatal(e) = > throw new SparkException("外部スケジューラをインスタンス化できません", e) }}// メソッド getClsuterManager は、ExternalClusterManager のすべての実装をロードしますServiceLoader ClusterManager (KubernetesClusterManager および YarnClusterManager)、次にマスター URL をフィルターして KubernetesClusterManagerprivate を選択します。def getClusterManager(url: String): Option[ExternalClusterManager] = {val loader = Utils .getContextOrSparkClassLoader val serviceLoaders = ServiceLoader .load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) {throw new SparkException( s"URL $url: $serviceLoaders に複数の外部クラスター マネージャーが登録されています") } serviceLoaders.headOption}
以下は、KubernetesClusterSchedulerBackend が Executor を管理するロジックです。 Executor を作成するためのコード ロジックを簡単に見てみましょう。 - private def requestNewExecutors( expected: Int, running: Int, applicationId: String, resourceProfileId: Int, pvcsInUse: Seq[String]): Unit = { val numExecutorsToAllocate = math .min(expected - running, podAllocationSize) logInfo(s"Kubernetes に $numExecutorsToAllocate 個のエグゼキューターをリクエストします for " + s"ResourceProfile Id: $resourceProfileId、target: $expected running: $running.") // このエグゼキューター割り当てバッチの再利用可能な PVC を確認します val reusablePVCs = getReusablePVCs (applicationId, pvcsInUse) for ( _ < - 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER .incrementAndGet() val executorConf = KubernetesConf .createExecutorConf( conf, newExecutorId.toString, applicationId, driverPod, resourceProfileId) // Executor の Pod 仕様を構築します valresolvedExecutorSpec = executorBuilder .buildFromFeatures (executorConf, secMgr, kubernetesClient, rpIdToResourceProfile(resourceProfileId)) valexecutorPod = resolvedExecutorSpec .pod valpodWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) .endSpec() .build() val resources = replacePVCsIfNeeded ( podWithAttachedContainer,resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) // Executor Pod を作成します val createdExecutorPod = kubernetesClient .pods().create(podWithAttachedContainer) try { // 所有者参照を追加します。 addOwnerReference(createdExecutorPod, resources) resources .filter( _.getKind == "PersistentVolumeClaim") .foreach { resource = > if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { addOwnerReference(driverPod.get, Seq(resource)) } val pvc = resource .asInstanceOf[PersistentVolumeClaim] logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + s"StorageClass ${pvc.getSpec.getStorageClassName}") kubernetesClient.persistentVolumeClaims().create(pvc) } newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Kubernetes から ID $newExecutorId の実行プログラムを要求しました。") } catch { case NonFatal(e) = > kubernetesClient.pods().delete(createdExecutorPod) throw e } } }
5. Kubernetes での Flink の実装 Flink のネイティブ K8s 実装: FlinkクライアントはJobManagerのデプロイメントを作成し、そのデプロイメントをk8sにホストします。 k8s デプロイメント コントローラが JobManager Pod を作成する JobManager 内の ResourceManager は、最初に Kubernetes Scheduler からリソースを要求し、TaskManager などの関連リソースを作成し、関連する TaskManager Pod を作成してジョブの実行を開始する役割を担います。ジョブが最終状態まで実行されると、関連するすべての k8s リソースがクリーンアップされます。コード (ブランチ release-1.13 に基づく) は主に次のように実装されています。 CliFrontend は Flink クライアントのエントリ ポイントです。コマンドライン パラメータ run-application に従って、runApplication メソッドを通じて ApplicationCluster の作成を決定します。 KubernetesClusterDescriptorは、deployApplicationClusterメソッドを通じて、JobManager関連のデプロイメントといくつかの必要なリソースを作成します。 JobManager 実装クラス JobMaster は、ResourceManager を介して KubernetesResourceManagerDriver クラスの requestResource メソッドを呼び出して、TaskManager などのリソースを作成します。 KubernetesClusterDescriptor は、Flink クラスター上の操作を記述するために、インターフェース ClusterDescriptor から実装されます。基盤となるリソースの使用状況に応じて、ClusterDescriptor には KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor などのさまざまな実装があります。 - public interface ClusterDescriptor < T > extends AutoCloseable { /* クラスターに関する詳細 (NodeManagers、使用可能なメモリなど) を含む文字列を返します。 */ 文字列 getClusterDescription(); /* 既存の Flink クラスターをクエリします。 */ ClusterClientProvider < T >取得(T clusterId) は ClusterRetrieveException をスローします。 /** Flink セッション クラスターを作成します */ ClusterClientProvider < T > deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; /** Flink アプリケーション クラスターを作成します **/ ClusterClientProvider < T > deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException; /** ジョブごとのクラスターを作成します **/ ClusterClientProvider < T > deployJobCluster( final ClusterSpecification clusterSpecification, final JobGraph jobGraph, final boolean detached) throws ClusterDeploymentException; /** クラスターを削除します **/ void killCluster(T clusterId) throws FlinkException; @Override void close();}
KubernetesClusterDescriptor のコアロジック「アプリケーション クラスターの作成」を簡単に見てみましょう。 - パブリック クラス KubernetesClusterDescriptor は ClusterDescriptor < String >を実装します{ private final Configuration flinkConfig; // 組み込み k8s クライアント private final FlinkKubeClient client;プライベート最終文字列クラスターID; @Override public ClusterClientProvider < String > deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException { // flink クラスターを照会します。k8s 内に存在するかどうかを確認します。if (client.getRestService(clusterId).isPresent()) { throw new ClusterDeploymentException( "Flink クラスター " + clusterId + " は既に存在します。"); } 最終的な KubernetesDeploymentTargetデプロイメントターゲット= KubernetesDeploymentTarget .fromConfig(flinkConfig); if (KubernetesDeploymentTarget.APPLICATION != deployTarget) { throw new ClusterDeploymentException( "Kubernetes アプリケーション クラスターをデプロイできませんでした。" + " 期待されたdeployment.target = " + KubernetesDeploymentTarget.APPLICATION.getName() + "ですが、実際は \"" + deployTarget + "\"" でした); } // アプリケーション パラメータを設定します: $internal.application.program-args および $internal.application.main applicationConfiguration.applyToConfiguration(flinkConfig); // クラスターを作成する final ClusterClientProvider < String > clusterClientProviderをdeployClusterInternal (KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false);試してください (ClusterClient <文字列> clusterClient = clusterClientProvider .getClusterClient()) { LOG.info( "flink アプリケーション クラスター {} を正常に作成しました。JobManager Web インターフェース: {}", clusterId, clusterClient.getWebInterfaceURL()); } clusterClientProvider を返します。 } // クラスター ロジックを作成します。 private ClusterClientProvider < String > deployClusterInternal( String entryPoint, ClusterSpecification clusterSpecification, boolean detached) throws ClusterDeploymentException { final ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfig.setString( ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE、 実行モード.toString()); flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS、エントリポイント); // Rpc、blob、rest、taskManagerRpc ポートを公開する必要があるため、固定値に更新します。 // ポートを固定値として指定すると、k8s のリソース構築が容易になります。ポッドが分離されているため、ポートの競合は発生しません KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig、 TaskManagerOptions.RPC_PORT、 Constants.TASK_MANAGER_RPC_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); // HA 構成 if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, flinkConfig.get(JobManagerOptions.PORT)); } 試してください { 最終的な KubernetesJobManagerParameters kubernetesJobManagerParameters = new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); // PodTemplate ロジックを補足 final FlinkPod podTemplate = kubernetesJobManagerParameters .getPodTemplateFilePath() .map( file - > KubernetesUtils.loadPodFromTemplateFile( client, file, Constants.MAIN_CONTAINER_NAME)) .orElse(new FlinkPod.Builder().build());最終的な KubernetesJobManagerSpecificationは、 KubernetesJobManagerFactoryをビルドします。podTemplate、 kubernetesJobManagerParameters などです。 // コアロジック: JobManager を含む k8s で作成します。デプロイメントには、Service や ConfigMap などの k8s リソースが含まれます。client.createJobManagerComponent(kubernetesJobManagerSpec); createClusterClientProvider(clusterId) を返します。 } catch (例外 e) { //... } }}
上記のコードで必要なのは、JobManager をビルドするときに PodTemplate を追加することです。簡単に言えば、PodTemplate は Pod ファイルです。 3 番目のステップでの TaskManager の作成はここでは繰り返されません。 7. エコシステム ここでは「エコシステム」という言葉は適切ではないかもしれません。これは主に、この機能を本番環境で使用する場合に他に何ができるかを指します。以下では、主に、実稼働環境でのトラブルシューティングに使用される 2 つの機能、ログ記録と監視について説明します。 ログ ログ収集はオンライン システムの非常に重要な部分です。障害の80%の原因はログからわかると言っても過言ではありません。ただし、前述したように、Flink ジョブはジョブが最終状態まで実行された後にすべてのリソースをクリーンアップし、Spark ジョブは実行後にドライバー ポッドのログのみを保持します。では、完全なジョブ ログを収集するにはどうすればよいでしょうか? 選択できるオプションはいくつかあります: デーモンセット。ログ収集エージェントは、DaemonSet の形式で各 k8s ノードにデプロイされ、ノード上で実行されているすべてのコンテナのログを収集し、ElasticSearch などの統合ログ検索プラットフォームに保存します。 サイドカー。 Flink/Spark が提供する PodTemplate 関数を使用して、メイン コンテナ側に SideCar コンテナを構成してログを収集し、最終的に統合ログ サービスに保存します。 どちらの方法でも、ELK や大手クラウド ベンダーのログ サービスなど、他のログ サービスがストレージ機能や検索機能を提供する必要があります。 さらに、検討すべきもう 1 つの簡単な方法があります。log4j の拡張メカニズムを使用し、ログ アペンダーをカスタマイズし、アペンダー内のアペンド ロジックをカスタマイズして、ログを収集し、HDFS、オブジェクト ストレージなどのリモート ストレージに直接保存します。このソリューションでは、カスタム ログ アペンダーの jar パッケージを実行中のジョブの ClassPath の下に配置する必要があり、この方法はジョブのメイン プロセスの実行効率に影響を与える可能性があります。パフォーマンスが重視されるジョブには推奨されません。 モニター 現在、Prometheus は k8s エコシステムの事実上の監視標準となっています。以下の説明では、Flink/Spark ジョブのインジケーターを Prometheus に接続する方法についても説明します。 Prometheus のアーキテクチャを見てみましょう。 その核心は、Prometheus Servier がメトリックを収集するか、プッシュするかです。 オンライン サービスなどの常駐プロセスの場合、Prometheus Server は通常、プロセスによって公開される API プル インジケーターを積極的に削除します。 バッチ ジョブなどの終了するプロセス メトリックの場合、プロセスは通常、アクティブにプッシュするために使用されます。詳細なプロセスは、プロセスがメトリックを常駐 PushGateway にプッシュし、次に Prometheus Server PushGateway がメトリックをプルするというものです。 上記の 2 つの使用方法は、Prometheus が公式に推奨する使用方法でもありますが、説明を読むと、2 番目の処理方法が最初のシナリオで使用できることがわかりにくくありません。ただし、2 番目の方法は、PushGateway が永続的であるため、比較的高い安定性が求められます。 フリンク Flinkはまた、PrometheusReporter(APIを介して露出したメトリック、およびPrometheus Serverによって積極的にデータを取得)とPrometheuspushgatewayReporter(Pushgatewayに積極的にプッシュメトリックをプッシュするため、Prometheus Serverはフリンクジョブを消去する必要はありません)も提供します。 PrometheuspushgatewayReporterは、これら2つの方法で少し簡単になりますが、Pushgatewayはボトルネックになる可能性があります。 PrometheusReporterを使用する場合、Prometheus Serverがフリンクジョブを実行するエンドポイントを自動的に発見できるように、サービス発見メカニズムを導入する必要があります。現在プロメテウスによってサポートされている主流のサービス発見メカニズムは、主に次のようです。 領事に基づいています。 Consulは、ETCDに基づく完全なサービス登録および発見ソリューションです。この方法を使用するには、領事に接続するためにフリンクが必要です。たとえば、ジョブを提出するとき、私たちはジョブの対応するサービスをキャプチャし、それを領事に書き込みます。 ファイルベース。ファイルはPROMETHEUS構成ファイルであり、構成はターゲットのエンドポイントをプルする必要があります。ファイルはもともとPrometheusサーバーとFlinkジョブの両方が同時にアクセスできる必要があるため、お金の無駄でしたが、ファイルはローカルである必要があります。ただし、ファイルに基づくK8S環境では、比較的簡単になります。 PrometheusサーバーのポッドにConfigMapをマウントし、FlinkジョブのConfigMapを変更できます。 Kubernetesベースのサービスディスカバリーメカニズム。 Kubernetesのサービス発見メカニズムは、単にラベルセレクトです。参照してください - https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
Prometheusがサポートするサービスの発見メカニズムについては、https://prometheus.io/docs/prometheus/latest/configuration/configuration/を参照してください。 アズール 領事 DigitalOcean ドッカー Dockerswarm dns EC2 ユーレカ ファイル グーセ ヘッツナー http クベネフィット ... スパーク バッチコンピューティングに代表されるSparkは、Pushgatewayを使用してPrometheusに接続しますが、SparkはPushgatewayのサポートを提供せず、Prometheusの輸出業者のみをサポートし、Prometheus Serverにデータを積極的に引き出す必要があります。 Kubernetesベースのサービス発見メカニズムを使用することをお勧めします。 Prometheus Server Pullインジケーターは、固定時間間隔でプルされることに注意する必要があります。比較的短い期間のバッチジョブの場合、ジョブはプルインジケータなしで終了する可能性があります。 8。欠陥 SparkとFlinkは両方ともネイティブK8Sモデルを実装していますが、特定の実装はわずかに異なります。ただし、実際に使用すると、両方の実装がいくつかのシナリオでまだわずかに欠陥があることがわかります。 スパーク ポッドは誤りではありません。 Spark-Submitは最初にK8Sドライバーポッドを構築し、次にドライバーポッドがエグゼキューターポッドを開始します。ただし、ポッドは断層耐性ではなく、ポッドはポッドの位置にあるノードの後にポッドを吊るします。 K8Sスケジューラに精通している学生は、PODにPodNameと呼ばれるフィールドがあることを知っている必要があります。スケジューラのコアは、PODのこのフィールドを埋めることです。つまり、PODに適したノードを選択します。スケジューリングが完了すると、ポッドのこのフィールドが固定されます。これは、ポッドにノードフォールトトレランスがない理由でもあります。 フリンク 展開セマンティクス。展開は、レプリカセットの拡張バージョンと見なすことができ、レプリカセットの公式定義は次のとおりです。 レプリカセットの目的は、いつでも実行されているレプリカポッドの安定したセットを維持することです。そのため、指定された数の同一ポッドの可用性を保証するためによく使用されます。 簡単に言えば、レプリカセットの目的は、いくつかの同一のポッドコピーが継続的に実行できるようにすることです。彼らがオンラインサービスに合わせて調整されていると言うのは誇張ではありません(オンラインサービスは、できればステートレスであり、Webサービスなどの現場の再起動をサポートしています)。しかし、Flinkはストリーミングジョブに焦点を当てていますが、ストリーミングジョブをStateless Webサービスと単純に同等にすることはできません。たとえば、フリンクジョブのメインジャーに問題がある場合、ジョブマネージャーポッドが継続的に開始されますが、展開セマンティクスの問題により、継続的に再起動されます。これはbydesignかもしれませんが、あまり気分が良くありません。 バッチジョブ処理。展開を含むすべてのリソースは、フリンクジョブが実行された後にクリーンアップされるため、最終的なジョブステータスを取得できず、成功するかどうかはわかりません(ストリーミングジョブが停止した場合、失敗と見なすことができます)。この問題では、Flink独自のアーカイブ関数を使用して、結果を外部ファイルシステム(Alibaba Cloud Object Storage OSSなどのS3プロトコルと互換性があります)にアーカイブできます。関係する構成は次のとおりです。 s3.Access-Key s3.Secret-Key s3.region s3.endpoint JobManager.Archive.FS.Dir 外部システムを導入したくない場合は、configmapやsecretなど、ジョブが完了した後、フリンクコードを変更してK8S APIオブジェクトにデータを書き込む必要があります。 ジョブログ。 Sparkジョブが実行された後、エグゼキューターポッドがクリーンアップされ、ドライバーポッドが保持されます。ドライバーのログを表示できます。フリンクジョブが終了した後、ログを表示できません。 9. まとめ この記事では、K8SエコロジーのSparkとFlinkの実装、実践、および比較を、使用方法、ソースコードの実装、および生産システムの周辺地域の補助方法から体系的に紹介します。ただし、スペースの制限により、シャッフルに対処する方法など、多くのコンテンツを議論する時間はありません。あなたの会社もこれを行っている場合、私はそれがまだ多くの参照値があると思います。メッセージを残してコミュニケーションをとることができます。 さらに、ヤーンの時代は過ぎ去り、将来、K8SスケジューラーがビッグデータコンピューティングとAIフレームワークの標準構成になります。ただし、オンラインサービス向けに自然に設計されたK8Sスケジューラーは、スループットで大きな欠点を持ち、ビッグデータ操作にはあまり適していません。 K8Sコミュニティのバッチスケジューラ、Kube-Batch、およびKube-Batchに由来するVolcanoスケジューラ、およびYarnスケジューリングアルゴリズムによって実装されたK8SエコロジースケジューラYunikornは、K8Sシナリオのビッグデータで徐々に出現しました。しかし、これらはすべて後の物語です。時間があれば分析と比較のための記事を書きます。 |