K8s 開発では、コントローラーの概念をよく耳にします。この記事では、これらの概念が K8s の最下層でどのように実装されているかを詳しく紹介します。 コントローラK8s では、コントローラーは controller-runtime (https://github.com/kubernetes-sigs/controller-runtime) フレームワークを通じて実装されます。 Kubebuilder や operator-sdk などのツールは、開発者がプロジェクトのスキャフォールディングをすばやく生成できるように、コントローラー ランタイムにカプセル化されています。 コントローラーは pkg/internal/controller/controller で定義されます。コントローラーには、主に Watch と Start の 2 つのメソッドと、調整メソッド Reconcile が含まれます。コントローラーの定義には、リソース オブジェクトの Informer または Indexer データが存在しないようです。ただし、K8s では、kube-apiserver リソースとのすべてのやり取りは Informer を通じて実装されます。実際、これは次の startWatches 属性を通じてカプセル化されます。 type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. Name string // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int // Reconciler is a function that can be called at any time with the Name / Namespace of an object and // ensures that the state of the system matches the state specified in the object. // Defaults to the DefaultReconcileFunc. Do reconcile.Reconciler // MakeQueue constructs the queue for this controller once the controller is ready to start. // This exists because the standard Kubernetes workqueues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. MakeQueue func() workqueue.RateLimitingInterface // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing Queue workqueue.RateLimitingInterface // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates // Deprecated: the caller should handle injected fields itself. SetFields func(i interface{}) error // mu is used to synchronize Controller setup mu sync.Mutex // Started is true if the Controller has been Started Started bool // ctx is the context that was passed to Start() and used when starting watches. // // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, // while we usually always strive to follow best practices, we consider this a legacy case and it should // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context // CacheSyncTimeout refers to the time limit set on waiting for cache to sync // Defaults to 2 minutes if not set. CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it // outside the context of a reconciliation. LogConstructor func(request *reconcile.Request) logr.Logger // RecoverPanic indicates whether the panic caused by reconcile should be recovered. RecoverPanic *bool } 時計() Watch メソッドは、まず現在のコントローラーが起動されているかどうかを判断します。そうでない場合は、ウォッチの内容を startWatches に一時的に保存し、コントローラーが起動するまで待機します。開始されている場合は、src.Start(c.ctx, evthdler, c.Queue, prct...) が直接呼び出されます。ここで、Source は、informer、kind、channel などになります。 // Watch implements controller.Controller. func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { ... // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). if !c.Started { c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) return nil } c.LogConstructor(nil).Info("Starting EventSource", "source", src) return src.Start(c.ctx, evthdler, c.Queue, prct...) } インフォーマーを例にとると、対応する EventHandler は次のメソッドを通じて追加されます。 _, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) kind を例にとると、対応する EventHandler は次の方法で追加されます。 i, lastErr = ks.cache.GetInformer(ctx, ks.Type) _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) internal.EventHandler は、OnAdd、OnUpdate、および OnDelete メソッドを実装します。つまり、src.Start メソッドの機能は、対応するインフォーマーを取得し、対応する EventHandler を登録することです。 始める() Start メソッドには 2 つの主な機能があります。 1 つは、すべての startWatch で Source の start メソッドを呼び出して、EventHandler を登録することです。 for _, watch := range c.startWatches { c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src)) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } 2 番目は、リソース オブジェクトを処理するために Work を開始することです。 for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { } }() } processNextWorkItem はキューからリソース オブジェクトを取得します。 reconcileHandler 関数は、実際に要素のビジネス処理を実行する場所です。この機能には、イベント処理とエラー処理が含まれます。実際のイベント処理は c.Do.Reconcile(req) を通じて開発者に公開されるため、開発者は Reconcile 関数でビジネス ロジックを処理するだけで済みます。 func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working return false } // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. defer c.Queue.Done(obj) ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) c.reconcileHandler(ctx, obj) return true } // Reconcile implements reconcile.Reconciler. func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { defer func() { if r := recover(); r != nil { if c.RecoverPanic != nil && *c.RecoverPanic { for _, fn := range utilruntime.PanicHandlers { fn(r) } err = fmt.Errorf("panic: %v [recovered]", r) return } log := logf.FromContext(ctx) log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r)) panic(r) } }() return c.Do.Reconcile(ctx, req) } 和解するコントローラーの調整ロジックは Reconcile で実行されます。 type Reconciler interface { // Reconcile performs a full reconciliation for the object referred to by the Request. // The Controller will requeue the Request to be processed again if an error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. Reconcile(context.Context, Request) (Result, error) } type Request struct { // NamespacedName is the name and namespace of the object to reconcile. types.NamespacedName } Reconcile メソッドの入力パラメータ Request は controller.queue から取得され、キュー内のデータ型が Reconcile.Request であるかどうかを判断します。データ型が不一致の場合、調整ロジックは実行されません。 func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Make sure that the object is a valid request. req, ok := obj.(reconcile.Request) if !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.Queue.Forget(obj) c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) // Return true, don't take a break return } } では、データはどのようにキューに入るのでしょうか?実際には、Informer の EventHandler を介してキューに入ります。 src.Start(c.ctx, evthdler, c.Queue, prct...) メソッドに戻ると、このメソッドはインフォーマーの internal.EventHandler を登録します。 internal.EventHandler は、OnAdd、OnUpdate、OnDelete などのメソッドを実装します。 OnAdd メソッドを例にとると、このメソッドは最終的に EventHandler.Create メソッドを呼び出します。 type EventHandler struct { EventHandler handler.EventHandler Queue workqueue.RateLimitingInterface Predicates []predicate.Predicate } // OnAdd creates CreateEvent and calls Create on EventHandler. func (e EventHandler) OnAdd(obj interface{}) { c := event.CreateEvent{} // Pull Object out of the object if o, ok := obj.(client.Object); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj)) return } for _, p := range e.Predicates { if !p.Create(c) { return } } // Invoke create handler e.EventHandler.Create(c, e.Queue) } EventHandler は、EnqueueRequestForObject、Funcs、EnqueueRequestForOwner、enqueueRequestsFromMapFunc の 4 つの実装クラスを持つインターフェースです。 EnqueueRequestForObject を例にとると、その作成メソッドは次のようになります。 // Create implements EventHandler. func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) } したがって、Reconcile によって調整されたデータ オブジェクトは、実際には Informer の EventHandler を通じてキューに登録されます。 kubebuilderなどのスキャフォールディングフレームワークの分析kubebuilder や operator-sdk などのフレームワークを使用すると、対応するリソース オブジェクトのコントローラー コードをすばやく生成できます。次に、kubebuilder を例に、コントローラー コードのロジックを分析します。 完全なコントローラー起動ロジックには、次の手順が含まれます。 1) main.go スタートアップ関数では、controllerManager オブジェクトが定義されています。 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "9a82ee0d.my.domain", CertDir: "dir", ... }) 2) SetUpWithManager() メソッドを使用して、各リソース オブジェクトのコントローラーを controllerManager オブジェクトに登録します。 if err = (&controllers.AppServiceReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "AppService") os.Exit(1) } 3) controllerManager を起動します。つまり、対応するリソース オブジェクトのコントローラーを起動します。 if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } 主なコード ロジックは、SetUpWithManager() と mgr.Start() の 2 つのメソッドにあります。 // SetupWithManager sets up the controller with the Manager. func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&appexamplecomv1.AppService{}). Complete(r) } ビルダーctrl.NewControllerManagedBy(mgr) はビルダー オブジェクトを返します。 NewControllerManagedBy = builder.ControllerManagedBy func ControllerManagedBy(m manager.Manager) *Builder { return &Builder{mgr: m} } Builder はコントローラーのコンストラクターであり、その構造は次のように定義されます。 type Builder struct { forInput ForInput ownsInput []OwnsInput watchesInput []WatchesInput mgr manager.Manager globalPredicates []predicate.Predicate ctrl controller.Controller ctrlOptions controller.Options name string } ctrlOptions は、主に Reconciler などのコントローラーを構築するためのいくつかの構成を指定します。 forInput は、build.For() を介して設定される、調整されるオブジェクト自体を指定します。 ownsInput は、調整および監視する子オブジェクト リソースを指定し、build.Owns() を通じて設定されます。 watchesInput は EventHandler 処理ロジックをカスタマイズし、build.Watches() を通じて設定できます。したがって、kubebuilder によって生成されたコントローラーは、デフォルトでは調整対象オブジェクト自体のみを調整します。 type WatchesInput struct { src source.Source eventhandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection } builder.Complete() は構築のために Builder.Build() を呼び出します。 Build() には、doController() と doWatch() という 2 つの重要なメソッドが含まれています。 コントローラの実行() doController は、リソース オブジェクトの GVK を通じてコントローラーの名前を取得し、最後に newController 関数を通じてコントローラーをインスタンス化します。 controllerName, err := blder.getControllerName(gvk, hasGVK) blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions) newController は、controller.New のエイリアスです。メソッドは次のとおりです: func New(name string, mgr manager.Manager, options Options) (Controller, error) { c, err := NewUnmanaged(name, mgr, options) if err != nil { return nil, err } // Add the controller as a Manager components return c, mgr.Add(c) } c, err := NewUnmanaged(name, mgr, options) は、コントローラー インスタンスを初期化します。コントローラがインスタンス化された後、コントローラは mgr.Add(c) 関数を通じて管理のためにマネージャに追加されます。 controllerManager の Add 関数は Runnable パラメータを渡します。 Runnable は、起動可能なコンポーネントを表すために使用されるインターフェースです。コントローラーは実際にこのインターフェースの Start 関数を実装しているので、コントローラー インスタンスは Add 関数を通じて追加できます。 ウォッチを実行する() DoWatch の実装は比較的単純で、controller.watch を呼び出して EventHandler イベントを登録するだけです。 DoWatch メソッドは、controller.Watch() メソッドを呼び出して EventHandler を登録します。 forInput などのリソースの場合、デフォルトの EventHandler は EnqueueRequestForObject であり、ownsInput などのリソースの場合、デフォルトの EventHandler は EnqueueRequestForOwner であることがわかります。これら 2 種類のハンドラーは上で説明されており、どちらも調整されたリソース オブジェクトをキューに入れることができる Create()、Update()、Delete() などのメソッドを実装しています。 func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil } watchesInput などのリソースは、対応する機能を実装するために次のようなメソッドを使用して、EventHandler を独自に実装する必要があります。前述の結論によれば、コントローラーで調整されるリソース オブジェクトはキューから取得され、キュー内のデータは EventHandler の Create、Update、Delete などの処理ロジックを通じてキューに入れられます。したがって、この時点でのコントローラーの処理順序は、EventHandler で定義されたロジック -> キュー -> Reconcile となります。 func (r *AppServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.Infof("开始Reconcile逻辑") ... return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *AppServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("appServiceController"). Watches( &source.Kind{ Type: &appexamplecomv1.AppService{}, }, handler.Funcs{ CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { klog.Infof("createFunc") limitingInterface.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: createEvent.Object.GetName(), Namespace: createEvent.Object.GetNamespace(), }}) }, UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { klog.Infof("updateFunc") }, DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { klog.Infof("deleteFunc") }, }). Complete(r) } 上記のコードは作成時にエンキュー処理のみを実行するため、Reconcile ロジックはリソースの作成時にのみ入力されます。 マネージャー.開始()コントローラーをマネージャーに登録した後、mgr.Start(ctrl.SetupSignalHandler()) を使用してマネージャーを起動する必要があります。前述したように、コントローラーを登録するときに、DoController メソッドで mgr.Add() を呼び出すと、コントローラーが実行可能な形式でマネージャーに追加されます。 Manager.start() は、cm.runnables の start メソッド、つまり controller.start() を呼び出してコントローラーを起動します。 func (cm *controllerManager) Start(ctx context.Context) (err error) { ... if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start and wait for caches. if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start the non-leaderelection Runnables after the cache has synced. if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } ... } |