1 つの記事で K8s コントローラー ランタイムを理解する

1 つの記事で K8s コントローラー ランタイムを理解する

K8s 開発では、コントローラーの概念をよく耳にします。この記事では、これらの概念が K8s の最下層でどのように実装されているかを詳しく紹介します。

コントローラ

K8s では、コントローラーは controller-runtime (https://github.com/kubernetes-sigs/controller-runtime) フレームワークを通じて実装されます。 Kubebuilder や operator-sdk などのツールは、開発者がプロ​​ジェクトのスキャフォールディングをすばやく生成できるように、コントローラー ランタイムにカプセル化されています。

コントローラーは pkg/internal/controller/controller で定義されます。コントローラーには、主に Watch と Start の 2 つのメソッドと、調整メソッド Reconcile が含まれます。コントローラーの定義には、リソース オブジェクトの Informer または Indexer データが存在しないようです。ただし、K8s では、kube-apiserver リソースとのすべてのやり取りは Informer を通じて実装されます。実際、これは次の startWatches 属性を通じてカプセル化されます。

 type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. Name string // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int // Reconciler is a function that can be called at any time with the Name / Namespace of an object and // ensures that the state of the system matches the state specified in the object. // Defaults to the DefaultReconcileFunc. Do reconcile.Reconciler // MakeQueue constructs the queue for this controller once the controller is ready to start. // This exists because the standard Kubernetes workqueues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. MakeQueue func() workqueue.RateLimitingInterface // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing Queue workqueue.RateLimitingInterface // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates // Deprecated: the caller should handle injected fields itself. SetFields func(i interface{}) error // mu is used to synchronize Controller setup mu sync.Mutex // Started is true if the Controller has been Started Started bool // ctx is the context that was passed to Start() and used when starting watches. // // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, // while we usually always strive to follow best practices, we consider this a legacy case and it should // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context // CacheSyncTimeout refers to the time limit set on waiting for cache to sync // Defaults to 2 minutes if not set. CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it // outside the context of a reconciliation. LogConstructor func(request *reconcile.Request) logr.Logger // RecoverPanic indicates whether the panic caused by reconcile should be recovered. RecoverPanic *bool }

時計()

Watch メソッドは、まず現在のコントローラーが起動されているかどうかを判断します。そうでない場合は、ウォッチの内容を startWatches に一時的に保存し、コントローラーが起動するまで待機します。開始されている場合は、src.Start(c.ctx, evthdler, c.Queue, prct...) が直接呼び出されます。ここで、Source は、informer、kind、channel などになります。

 // Watch implements controller.Controller. func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { ... // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). if !c.Started { c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) return nil } c.LogConstructor(nil).Info("Starting EventSource", "source", src) return src.Start(c.ctx, evthdler, c.Queue, prct...) }

インフォーマーを例にとると、対応する EventHandler は次のメソッドを通じて追加されます

 _, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})

kind を例にとると、対応する EventHandler は次の方法で追加されます。

 i, lastErr = ks.cache.GetInformer(ctx, ks.Type) _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})

internal.EventHandler は、OnAdd、OnUpdate、および OnDelete メソッドを実装します。つまり、src.Start メソッドの機能は、対応するインフォーマーを取得し、対応する EventHandler を登録することです。

始める()

Start メソッドには 2 つの主な機能があります。 1 つは、すべての startWatch で Source の start メソッドを呼び出して、EventHandler を登録することです。

 for _, watch := range c.startWatches { c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src)) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } }

2 番目は、リソース オブジェクトを処理するために Work を開始することです。

 for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { } }() }

processNextWorkItem はキューからリソース オブジェクトを取得します。 reconcileHandler 関数は、実際に要素のビジネス処理を実行する場所です。この機能には、イベント処理とエラー処理が含まれます。実際のイベント処理は c.Do.Reconcile(req) を通じて開発者に公開されるため、開発者は Reconcile 関数でビジネス ロジックを処理するだけで済みます。

 func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working return false } // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. defer c.Queue.Done(obj) ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) c.reconcileHandler(ctx, obj) return true } // Reconcile implements reconcile.Reconciler. func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { defer func() { if r := recover(); r != nil { if c.RecoverPanic != nil && *c.RecoverPanic { for _, fn := range utilruntime.PanicHandlers { fn(r) } err = fmt.Errorf("panic: %v [recovered]", r) return } log := logf.FromContext(ctx) log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r)) panic(r) } }() return c.Do.Reconcile(ctx, req) }

和解する

コントローラーの調整ロジックは Reconcile で実行されます。

 type Reconciler interface { // Reconcile performs a full reconciliation for the object referred to by the Request. // The Controller will requeue the Request to be processed again if an error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. Reconcile(context.Context, Request) (Result, error) } type Request struct { // NamespacedName is the name and namespace of the object to reconcile. types.NamespacedName }

Reconcile メソッドの入力パラメータ Request は controller.queue から取得され、キュー内のデータ型が Reconcile.Request であるかどうかを判断します。データ型が不一致の場合、調整ロジックは実行されません。

 func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Make sure that the object is a valid request. req, ok := obj.(reconcile.Request) if !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.Queue.Forget(obj) c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) // Return true, don't take a break return } }

では、データはどのようにキューに入るのでしょうか?実際には、Informer の EventHandler を介してキューに入ります。 src.Start(c.ctx, evthdler, c.Queue, prct...) メソッドに戻ると、このメソッドはインフォーマーの internal.EventHandler を登録します。 internal.EventHandler は、OnAdd、OnUpdate、OnDelete などのメソッドを実装します。 OnAdd メソッドを例にとると、このメソッドは最終的に EventHandler.Create メソッドを呼び出します。

 type EventHandler struct { EventHandler handler.EventHandler Queue workqueue.RateLimitingInterface Predicates []predicate.Predicate } // OnAdd creates CreateEvent and calls Create on EventHandler. func (e EventHandler) OnAdd(obj interface{}) { c := event.CreateEvent{} // Pull Object out of the object if o, ok := obj.(client.Object); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj)) return } for _, p := range e.Predicates { if !p.Create(c) { return } } // Invoke create handler e.EventHandler.Create(c, e.Queue) }

EventHandler は、EnqueueRequestForObject、Funcs、EnqueueRequestForOwner、enqueueRequestsFromMapFunc の 4 つの実装クラスを持つインターフェースです。 EnqueueRequestForObject を例にとると、その作成メソッドは次のようになります。

 // Create implements EventHandler. func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) }

したがって、Reconcile によって調整されたデータ オブジェクトは、実際には Informer の EventHandler を通じてキューに登録されます。

kubebuilderなどのスキャフォールディングフレームワークの分析

kubebuilder や operator-sdk などのフレームワークを使用すると、対応するリソース オブジェクトのコントローラー コードをすばやく生成できます。次に、kubebuilder を例に、コントローラー コードのロジックを分析します。

完全なコントローラー起動ロジックには、次の手順が含まれます。

1) main.go スタートアップ関数では、controllerManager オブジェクトが定義されています。

 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "9a82ee0d.my.domain", CertDir: "dir", ... })

2) SetUpWithManager() メソッドを使用して、各リソース オブジェクトのコントローラーを controllerManager オブジェクトに登録します。

 if err = (&controllers.AppServiceReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "AppService") os.Exit(1) }

3) controllerManager を起動します。つまり、対応するリソース オブジェクトのコントローラーを起動します。

 if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) }

主なコード ロジックは、SetUpWithManager() と mgr.Start() の 2 つのメソッドにあります。

 // SetupWithManager sets up the controller with the Manager. func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&appexamplecomv1.AppService{}). Complete(r) }

ビルダー

ctrl.NewControllerManagedBy(mgr) はビルダー オブジェクトを返します。

 NewControllerManagedBy = builder.ControllerManagedBy func ControllerManagedBy(m manager.Manager) *Builder { return &Builder{mgr: m} }

Builder はコントローラーのコンストラクターであり、その構造は次のように定義されます。

 type Builder struct { forInput ForInput ownsInput []OwnsInput watchesInput []WatchesInput mgr manager.Manager globalPredicates []predicate.Predicate ctrl controller.Controller ctrlOptions controller.Options name string }

ctrlOptions は、主に Reconciler などのコントローラーを構築するためのいくつかの構成を指定します。 forInput は、build.For() を介して設定される、調整されるオブジェクト自体を指定します。 ownsInput は、調整および監視する子オブジェクト リソースを指定し、build.Owns() を通じて設定されます。 watchesInput は EventHandler 処理ロジックをカスタマイズし、build.Watches() を通じて設定できます。したがって、kubebuilder によって生成されたコントローラーは、デフォルトでは調整対象オブジェクト自体のみを調整します。

 type WatchesInput struct { src source.Source eventhandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection }

builder.Complete() は構築のために Builder.Build() を呼び出します。 Build() には、doController() と doWatch() という 2 つの重要なメソッドが含まれています。

コントローラの実行()

doController は、リソース オブジェクトの GVK を通じてコン​​トローラーの名前を取得し、最後に newController 関数を通じてコン​​トローラーをインスタンス化します。

 controllerName, err := blder.getControllerName(gvk, hasGVK) blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)

newController は、controller.New のエイリアスです。メソッドは次のとおりです: func New(name string, mgr manager.Manager, options Options) (Controller, error) { c, err := NewUnmanaged(name, mgr, options) if err != nil { return nil, err }

 // Add the controller as a Manager components return c, mgr.Add(c) }

c, err := NewUnmanaged(name, mgr, options) は、コントローラー インスタンスを初期化します。コントローラがインスタンス化された後、コントローラは mgr.Add(c) 関数を通じて管理のためにマネージャに追加されます。 controllerManager の Add 関数は Runnable パラメータを渡します。 Runnable は、起動可能なコンポーネントを表すために使用されるインターフェースです。コントローラーは実際にこのインターフェースの Start 関数を実装しているので、コントローラー インスタンスは Add 関数を通じて追加できます。

ウォッチを実行する()

DoWatch の実装は比較的単純で、controller.watch を呼び出して EventHandler イベントを登録するだけです。 DoWatch メソッドは、controller.Watch() メソッドを呼び出して EventHandler を登録します。 forInput などのリソースの場合、デフォルトの EventHandler は EnqueueRequestForObject であり、ownsInput などのリソースの場合、デフォルトの EventHandler は EnqueueRequestForOwner であることがわかります。これら 2 種類のハンドラーは上で説明されており、どちらも調整されたリソース オブジェクトをキューに入れることができる Create()、Update()、Delete() などのメソッドを実装しています。

 func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil }

watchesInput などのリソースは、対応する機能を実装するために次のようなメソッドを使用して、EventHandler を独自に実装する必要があります。前述の結論によれば、コントローラーで調整されるリソース オブジェクトはキューから取得され、キュー内のデータは EventHandler の Create、Update、Delete などの処理ロジックを通じてキューに入れられます。したがって、この時点でのコントローラーの処理順序は、EventHandler で定義されたロジック -> キュー -> Reconcile となります。

 func (r *AppServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.Infof("开始Reconcile逻辑") ... return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("appServiceController"). Watches( &source.Kind{ Type: &appexamplecomv1.AppService{}, }, handler.Funcs{ CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { klog.Infof("createFunc") limitingInterface.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: createEvent.Object.GetName(), Namespace: createEvent.Object.GetNamespace(), }}) }, UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { klog.Infof("updateFunc") }, DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { klog.Infof("deleteFunc") }, }). Complete(r) }

上記のコードは作成時にエンキュー処理のみを実行するため、Reconcile ロジックはリソースの作成時にのみ入力されます。

マネージャー.開始()

コントローラーをマネージャーに登録した後、mgr.Start(ctrl.SetupSignalHandler()) を使用してマネージャーを起動する必要があります。前述したように、コントローラーを登録するときに、DoController メソッドで mgr.Add() を呼び出すと、コントローラーが実行可能な形式でマネージャーに追加されます。 Manager.start() は、cm.runnables の start メソッド、つまり controller.start() を呼び出してコントローラーを起動します。

 func (cm *controllerManager) Start(ctx context.Context) (err error) { ... if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start and wait for caches. if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start the non-leaderelection Runnables after the cache has synced. if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } ... }

<<:  レンタルではなく販売のみです!ソフトウェアは完全にオープンです!世界初の「クラウドコンピュータ」が伝統を覆す

>>:  Capistrano が Docker と Kubernetes に置き換えられた理由

推薦する

選択: 企業ウェブサイトに大きな変更を加える方法

ウェブサイトのバージョンを更新することは一般的です。ウェブサイトを公開時から「変更なし」のままにして...

新浪ブログの外部リンクが表示されない理由を明かす

ほとんどのSEO担当者は、外部リンクをサードパーティのブログに張る際にSina Blogを第一候補に...

クラウドガバナンスのベストプラクティスについてお話ししましょう

クラウド ガバナンスは、クラウドでサービスを実行する企業が採用する一連のルールとポリシーです。組織は...

SEO担当者のよくある仕事上の誤解の分析

多くの場合、ウェブサイトのランキングが向上しない理由は、SEO最適化担当者自身が十分に努力していない...

ルー・ソンソン:価格比較サイトでの初めての買い物体験

お気に入りで「価格を比較」をクリックすると、さまざまなショッピングモールの550Dの価格が出てきまし...

SEO ウェブサイト最適化 リソース構築 ブログ運用 スキル共有

検索エンジンのアルゴリズムの調整に伴い、主要な検索エンジンによるブログの認知度は、大多数のSEO担当...

電子商取引ウェブサイトに SEO テクニックと戦略を活用する方法

現在、SEO テクノロジーはインターネットのプロモーションとマーケティングの主要なツールの 1 つで...

Weiboマーケティング:ブランドWeibo運用の5つの現状

現状調査によると、ブランドWeiboの現在の運営状況は、「ブラインド型」、「伝統型」、「コンテンツ型...

マーケティングの新時代: Down-to-earth はアメリカのバーチャルホストにとって新しいマーケティング手法

「国際ファン」ファン・ビンビンは新作映画のプロモーションのためハッピーキャンプに立ち寄ったが、それで...

CKA 試験の効率性の向上: 準備完了状態のノードを正確にカウントするための実用的な戦略

Kubernetes クラスターは、マスター ノードと複数のノード ノードで構成されます。ノードはク...

高品質な外部リンクを構築する方法は何ですか?

「お金の流れを良くしたいなら、道路が必要だ」ということわざがある。多くの場所で、経済を活性化させるた...

Lefeng.comはJumei.comが偽物を販売していると示唆。2つの大手美容Eコマース企業がトップの座を争っている。

国内の美容電子商取引市場のトップの座をめぐって、1か月間も続いているJumeiとLefeng.com...

ウェブサイトを構築するときに、新しいドメイン名と古いドメイン名の重みを正しく確認する方法

多くのウェブマスターの友人、特に初心者のウェブマスターは、さまざまな情報の影響を受けており、「古いド...

推奨プロメテウス:新しく追加 - ルーマニアのデータセンター/著作権所有者の楽園

prometeus.net 傘下のクラウド ホスティング ブランドである iwstack.com は...