K8s Informer はどのようにしてイベントが失われないようにするのでしょうか?

K8s Informer はどのようにしてイベントが失われないようにするのでしょうか?

1. リソースコントローラの主な機能

k8s の重要な概念の 1 つは宣言型 API であることはご存知でしょう。たとえば、kubectl apply は宣言型 API の実装です。

その結果、リソース オブジェクトの実行状態は宣言と一致している必要があります。たとえば、kubectl がデプロイメント yml を適用する場合、必要なステータスはデプロイメントが正常に実行されることです。

そこで疑問になるのが、k8s はどのようにしてリソース オブジェクトを「監視」し、宣言した状態が常に維持されるようにするのかということです。答えは「コントローラー」です。 kube-controller-manager コンポーネントに加えて、カスタム コントローラーとも呼ばれる独自のコントローラーを作成することもできます (便宜上、以下では総称してカスタム コントローラーと呼びます)。

次に、コントローラーの背後にある「秘密」を分析してみましょう

2. プロセスの概要

まず、コミュニティによって提供されたコントローラー アーキテクチャ図を見てみましょう。

主なオブジェクト (構造) には、Reflector、Informer、Indexer などがあります。 ReflectorとIndexerについては、次回以降で順に説明していきます。

この記事では主にInformerについて説明します。

図からわかるように、主に9つのステップがあります。ここでは、9 つ​​のステップを 3 つの主要なステップにまとめます。

(絵がちょっと汚いです-__-!!!)

大きなステップ 1 : Reflector はリソース オブジェクトのイベントを Delta FIFO キューに追加します。

ここでは Delta FIFO キューについて紹介します。いわゆるデルタは変化を意味します。どのような変化ですか?リソースオブジェクトの変更です。

つまり、リソース オブジェクトに対するすべての変更が Delta FIFO キューに追加されます。今では簡単に理解できます。

大きなステップ 2 : Informer は Delta FIFO キュー内のオブジェクト データをローカル キャッシュに追加します。

さらに、このローカル キャッシュには、監視リソース オブジェクトの最新バージョンがキャッシュされます。キャッシュされた現在のクラスター内のリソース情報です。

大きなステップ 3 : ワークキューを使用してビジネス ロジックを処理します。

3. ステップ分析

コミュニティによって書かれたカスタム コントローラーの使用例に基づいてソース コード分析を実行しましょう。ここで使用されているバージョンは client-go v0.20.5 です。

ユースケースでは共通インフォーマーが使用され、紹介もされています。しかし、sharedInformer の方が一般的に使用されています。たとえば、manager と SharedInformerFactory はどちらも通常のインフォーマーを再カプセル化したものなので、本質的な部分は同じです。ご興味がございましたら、後ほどsharedInformerとmanagerを紹介する記事を公開します。

大きなステップ1

アーキテクチャ図の中央に境界線があり、プロセスが 2 つに分割されており、上半分には主に主要なステップ 1 と 2 が含まれていることがわかります。

これら 2 つのステップは実際には相互に関連しています。エントリ コードは次の行です: informer.Run()。今のところは無視して構いません。

まず、ユースケースにおける Informer の初期化エントリ コードを見てみましょう。

NewIndexerInformer のコードは次のとおりです。

実際の Informer の初期化は newInformer です。

381 行目は Delta FIFO の初期化であることに注意してください。アーキテクチャ図の Delta FIFO キューはここでインスタンス化されます。

newInformer は低レベルの Controller インターフェイスを返すことがわかりました。このインターフェースは非常にシンプルで、メソッドは 3 つだけです。

実行(stopCh <-chan 構造体{}):

运行逻辑。

HasSynced() ブール値:

数据同步完成与否

LastSyncResourceVersion() 文字列:

资源最近一次的ResourceVersion

次に、3 つのメソッドを見て、これがコントローラーでどのように実装されているかを確認しましょう。

419 行目の低レベル コントローラーの初期化のコードに直接ジャンプすると、Run メソッドの実装を簡単に確認できます。

コードの大部分は Reflector の初期化です。

152 行目はコルーチンを開始します。 *r.Run* は、Reflector の実行ロジックです。リソース オブジェクトを一覧表示および監視し、オブジェクトを Delta FIFO キューに追加します。

「ジャンプ」をクリックして、ListAndWatch メソッドに直接ジャンプしてみましょう。このコードは長くて乱雑ですが (文句を言わずにはいられません)、実行する必要があるのは非常に単純で、たった 4 つのことだけです。ここでキーコードをコピーします。

まず最初に

初期化した cache.ListWatch オブジェクトの ListFunc を使用してリソース オブジェクトを取得し、オブジェクトを Delta FIFO キューに同期します。

 func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ...... ...... list, paginatedResult, err = pager.List(context.Background(), options) if isExpiredError(err) || isTooLargeResourceVersionError(err) { r.setIsLastSyncResourceVersionUnavailable(true) // 拉取资源列表list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } ..... resourceVersion = listMetaInterface.GetResourceVersion() ..... items, err := meta.ExtractList(list) // 转换成对象...... if err := r.syncWith(items, resourceVersion); err != nil { // 将拉取到资源对象都添加到Delta FIFO queue return fmt.Errorf("unable to sync list result: %v", err) } ...... r.setLastSyncResourceVersion(resourceVersion) // 设置最近一次的版本...... }

簡単に説明します。 r.syncWith(items, resourceVersion) は主に Delta FIFO キューの Replace() を通じてリソースを同期します。重要なロジックの 1 つは次のとおりです。

 if !f.populated { f.populated = true f.initialPopulationCount = len(list) + queuedDeletions }

f.populated = true は、リソース オブジェクトがキューに入るアクションが発生したことを確認します。 f.initialPopulationCount は、キュー内にすでに存在するオブジェクトの数を決定します。

次に、インフォーマー HasSynced() の基礎となるロジックを見てみましょう。

 func (f *DeltaFIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() return f.populated && f.initialPopulationCount == 0 }

そして、f.initialPopulationCount-- は以下のポップで発生します。

LastSyncResourceVersion() 文字列 返されるバージョンは、r.setLastSyncResourceVersion(resourceVersion) によって設定されたバージョンです。

2番目

リソースを再度同期します。

 go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }()

ユースケース コードでは、cache.NewIndexerInformer() が resyncPeriod パラメータを設定し、これがここで機能します。

0 に設定されているため、このコルーチンは case<-resyncCh の場合は永久にブロックされます。

この詳細なロジックについては、後ほど Delta FIFO キューについて説明するときに説明します。簡単に言えば、インデクサーによってキャッシュされたデータを Delta FIFO キューに同期します。

3つ目は

初期化した cache.ListWatch オブジェクトの WatchFunc ウォッチ オブジェクトを使用します。

ここでの監視機能は、etcd の基盤となる監視機能です。興味のある学生は自分で学ぶことができるので、ここでは詳しく説明しません。

 w, err := r.listerWatcher.Watch(options) if err != nil { if utilnet.IsConnectionRefused(err) { <-r.initConnBackoffManager.Backoff().C() continue } return err } if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { ...... ...... }

4番目

監視対象オブジェクトを Delta FIFO キューに追加します。

 // watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { .... .... loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): .... .... switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } } ...... r.setLastSyncResourceVersion(newResourceVersion) // 设置最近一次的版本...... ...... }

簡単にまとめると、次の 2 つがあります。

  • 最初に、リソースリストを取得してデルタFIFOキューに追加します。
  • リソースオブジェクトの変更を監視し、Delta FIFOキューに追加します。

ビッグステップ2

Indexerer は、実際にはメモリ内データベースの抽象インターフェースです。このうち、Store は当然ストレージを表し、その他はインデックスに関連しています。

 // client-go/tools/cache/store.go type cache struct { // cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc }

キャッシュはインターフェースの実装、つまりキャッシュです。インデックスは検索に必ず使用され、その使用法については以下の最適化のセクションで確認できます。

次に、Run メソッドのスクリーンショットの 154 行目に戻り、2 番目の主要なステップのロジックを確認します。

wait.Until はタイマーであり、次のコードに簡略化できます。

 func Util(stopCh <-chan struct{}) { dur := 1 * time.Second timer := time.NewTimer(dur) defer timer.Stop() for { select { case <-stopCh: return case <-tC(): f() timer.Reset(dur) } } }

実行ロジックは c.processLoop です。

実際、コードは理解しやすいです。キュー (デルタ FIFO キュー) からアイテムをポップし、処理関数を呼び出して ResourceEventHandler 内のメソッドを実行します。

まず、ポップコードを見てみましょう。

 func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { ...... ...... id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- // 同步数据减1 } ...... item, ok := f.items[id] ...... err := process(item) ...... }

その内容は実は非常にシンプルです。ここでは主なロジックのみを記載します。誰でも理解できると信じています。また、同期を決定する上で前述した重要なロジックf.initialPopulationCountも確認しました。

つまり、Delta FIFO キュー内のすべてのデータがインデクサーに同期された場合にのみ、インフォーマーのデータ同期が完了します。

次に、newInformer スクリーンショットの 393 行目のプロセスを見てみましょう。拡張された方法は次のとおりです。

 Process: func(obj interface{}) error { for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err } h.OnUpdate(old, d.Object) } else { if err := clientState.Add(d.Object); err != nil { return err } h.OnAdd(d.Object) } case Deleted: if err := clientState.Delete(d.Object); err != nil { return err } h.OnDelete(d.Object) } } return nil }

Process 匿名関数の行パラメータを見てみましょう。 obj はポップアウトされるオブジェクトです。 Delta タイプ d.Type に基づいてオブジェクトの処理方法を決定します。 clientState は Indexer であり、h は ResourceEventHandler です。

したがって、ポップアウトされたオブジェクトはすぐに Indexer に入り、ResourceEventHandler の対応するメソッドを呼び出します。ここで、オブジェクトのキーをワークキューに追加します。

さまざまなメソッドの対応する操作がこのコードです。

ビッグステップ3

最後に、さまざまなリソース イベント (追加、更新、削除) を処理するのは独自のアプリケーションです。 Workqueue の存在により、キュー内の要素の処理が簡素化されます。

processNextItem 関数を直接見ることができます。

55行目、キュー内のデータを取得します。

65 行目は、処理オブジェクトのビジネス ロジックです。 syncToStdout はいくつかのログを出力するだけですが、obj, exists, err := c.indexer.GetByKey(key) の行は、インデクサーからリソース オブジェクトを取得するためのキーです。これにより、さまざまなビジネスロジックを処理できます。たとえば、私の仕事は一般的に、ResourceEventHandler によって定義された変更 (AddFunc、UpdateFunc、DeleteFunc、AddFunc のみ可能) を含むオブジェクトを独自のクラウド プラットフォームに書き戻すことです。

同様のコードは次のとおりです (syncToStdout はアクションに置き換えられます)。

 func (d *Deployment) action(key string) error { obj, exists, err := d.indexer.GetByKey(key) if err != nil { return fmt.Errorf("fetching object with key %s from store failed with %w", key, err) } ns, deploymentName, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } if exists { deployment, ok := obj.(*apps_v1.Deployment) // 一定要断言资源类型,这里类型要同list & watch 方法中的一致。github的例子是pod,这里用的是deployment if !ok { return fmt.Errorf("type asset fault") } post(deployment) // 将资源传回的伪代码} return nil }

この時点で、3 つの主要なステップが完了します。

4. 知識を追加します:

3 番目のステップは、主にワークキューを呼び出すことです。ワークキューには 3 つの種類があります。

  • 通常キュー
  • 遅延キュー
  • レート制限キュー

遅延キューは通常のキューをカプセル化したものです。速度制限キューは、追加の速度リミッターを備えた遅延キューをカプセル化したものです。

通常、エラーを処理するときに再試行できるように、レート制限のあるキューを使用します。

処理後は、処理中のキーをキューから削除することを忘れないでください。

 defer c.queue.Done(key)

再試行と削除はユースケース コードに明確に記述されています。これら 2 つの重要なロジックを見逃さないようにしてください。

死の「最適化」

ワークキューが少し冗長であることがわかるかもしれません。 ResourceEventHandler でビジネス ロジックを直接処理できます。コードは次のとおりです。

 func NewPodWithOutWorkQueue(ctx context.Context, clientset *kubernetes.Clientset) { //workQueue := workqueue.NewDelayingQueue() namespace := meta_v1.NamespaceAll listWatcher := &cache.ListWatch{ ListFunc: func(options meta_v1.ListOptions) (runtime2.Object, error) { //options.LabelSelector = requireLabel.String() return clientset.CoreV1().Pods(namespace).List(ctx, options) }, WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { //options.LabelSelector = requireLabel.String() return clientset.CoreV1().Pods(namespace).Watch(ctx, options) }, } indexer, informer := cache.NewIndexerInformer(listWatcher, &core_v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { fmt.Println("add: ", key) } }, DeleteFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { fmt.Println("delete: ", key) } }, }, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) go informer.Run(ctx.Done()) go GetIndexer(indexer) } func GetIndexer(idx cache.Indexer) { for { time.Sleep( 3 * time.Second) fmt.Println("GetIndexers:", idx.ListIndexFuncValues(cache.NamespaceIndex)) } }

ここで、Indexer の情報も確認してみることにしました。

GetIndexer は名前空間ごとに集約されたデータを出力します。これは、次の SQL ステートメント select namespace from xx_table として簡単に理解できます。

なぜ死を招くと言うのですか?私たちの友人の中には、コンポーネントに依存しないことが「最適化」であると考えてこのように書いた人もいましたが、公式のユースケースやマネージャーでワークキューが使用される理由については考えていませんでした。

そこで疑問が生じます。なぜワークキューを使用するのでしょうか?理由は次のとおりです。

  • Delta FIFO キューに依存せずにリソース イベントを順序付けます。
  • ワークキューはキャッシュとして表示することもできます。処理されるイベントは、まずキーの形式でワークキューにキャッシュされます。
缓存的作用相信很多人都清楚:解决两个组件处理速度不匹配的问题,如cpu 和硬盘之间经常是用内存做缓存。我们的业务处理逻辑大概率肯定是慢于事件的生成的,而且还延迟队列类型做选择

失敗後の再試行に便利です。

目玉焼きを加える

これはサイドストーリーシリーズと考えることができます、そして興味がない友人はそれを直接スキップすることができます。

実際、多くのキュー (Delta FIFO キュー、ワークキュー) がないと実行できないことに気づいた学生もおり、小さなデータベース (インデクサー) を使用することさえありました。

オブジェクトを直接観察できますか?これは、etcd watch API を直接呼び出すのと同じです。答えはイエスです。

ここからいくつかコードを借りてみましょう。

ポッドウォッチを実装するためのコードは次のようになります。

 func NewPodOnlyWithWatch(ctx context.Context, clientset *kubernetes.Clientset) { onlyWatch := &cache.ListWatch{ WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { //options.LabelSelector = requireLabel.String() //options.ResourceVersion = "" return clientset.CoreV1().Pods("devops").Watch(ctx, meta_v1.ListOptions{}) }, } watcher, err := watch2.NewRetryWatcher("1", onlyWatch) if err != nil { panic(err) } // Give the watcher a chance to get to sending events (blocking) time.Sleep(10 * time.Millisecond) for { select { case event, ok := <-watcher.ResultChan(): if !ok { fmt.Println("ResultChan closed") return } //fmt.Println("get event") if pod, ok := event.Object.(*core_v1.Pod); ok { switch event.Type { case watch.Added: fmt.Printf("新增事件:%s/%s\n", pod.Namespace, pod.Name) case watch.Deleted: fmt.Printf("删除事件:%s/%s\n", pod.Namespace, pod.Name) case watch.Modified: fmt.Printf("更新事件:%s/%s\n", pod.Namespace, pod.Name) default: fmt.Printf("%s事件:%s\n", event.Type, pod.Name) } } case <-watcher.Done(): fmt.Println("watcher down") return } } }

ただし、直接の視聴は推奨されません。その 1 つは、ビジネスの観点から見た定期的なイベントです。つまり、リソース オブジェクトの更新アクションは複数のイベントを受信します。

5. まとめ

私たちがよく話題にするコントローラーのコア機能は、リソースの変更を監視することです。これは、宣言型の概念であるインフォーマーで状態を保証するための重要なテクノロジです。プロセスは次のとおりです。

  • Reflector はオブジェクトを Delta FIFO キューに追加します。
  • 次に、インフォーマーはそれをポップアウトし、インデクサーとリソース イベント ハンドラーに追加します。
  • 最後の部分は独自のビジネス ロジックです。つまり、最初にワークキューに移動してキーを取得し、次にキーを使用してインデクサーでオブジェクトを交換し、最後にオブジェクトを処理します。

次に、誤った*最適化*の例を使用して、ワークキューの重要性を説明しました。

もう少しマニアックになって、オブジェクトの変更イベントを直接監視することもできますが、個人的にはこれをお勧めしません。

この記事では、主に、リソース イベントがインフォーマーから ResourceEventHandler に転送される一般的なプロセスを紹介しますが、詳細についてはあまり触れません。

デルタFIFOキュー、インデクサー、ワークキューなど、いくつかの重要なコンポーネントも習得する必要があるためです。

これらが明確になれば、プロセスの詳細を理解するのは非常に簡単になります。

もちろん、上記の内容を知ることに加えて、sharedInformer と、Controller を記述するための「アーティファクト」(controller-runtime によってカプセル化されたマネージャー) も習得する必要があります。

ご興味がございましたら、以下の記事で詳しくご紹介させていただきます。これらを理解すれば、コントローラーの技術的な詳細は難しくなくなると思います。

<<:  ハイブリッドクラウドプロジェクトの成功率を向上させる方法

>>:  手遅れになる前に企業がツールを活用してクラウドコストを管理する方法

推薦する

dedipath: シアトルのデータセンターVPSの簡単なレビュー。データからdedipathの優秀さがわかる

今月、米国西海岸のシアトルに Dedipath の新しいデータセンターが開設され、専用サーバーと V...

Google 補足資料の詳細

Googleの補足資料問題は、今年さらに注目を集めたSEO問題です。以前に、どの問題が補足資料になる...

中小企業がネットワークマーケティングを体系的に行うには(第3回) - コンテンツ

コンテンツとは、インターネット上で公開されるすべての情報を指します。コンテンツは、ユーザーが読んでニ...

豆班の不安は解消が難しい

Doubanは3月29日に非公開グループの閉鎖を発表したばかりで、3月30日にはWeiboに「不正競...

国内のウェブサイトは深刻なユーザーセキュリティ問題を抱えており、出会い系サイトの保護は最も弱い

中国ソフトウェアテストセンターが29日に発表した「ウェブサイト利用者のパスワード処理の安全性に関する...

最も安いアメリカの VPS 販売業者である racknerd が、最も安い VPS をリストアップしてみんなと共有します。

格安 VPS 販売業者の Racknerd は、設立以来、低価格で安価な VPS 路線を追求してきま...

クラウドコンピューティングの PAAS と SAAS の違いを 1 つの記事で理解する

クラウド コンピューティングが非常に普及している今日、クラウド ホストをレンタルするだけでクラウド ...

HUAWEI CLOUDはイノベーションと開発を加速し、金融業界のインテリジェンス化を支援します

フィンテックは金融業界を変革しています。デジタル時代において、金融業界は変化をさらに迅速に受け入れる...

IoTとエッジコンピューティングの連携方法

より多くのテクノロジー企業がソリューションを最適化し、運用コストを削減するより簡単な方法を模索するに...

失敗した地域コミュニティウェブサイト所有者の声

2009 年 9 月 1 日は私にとって特別な日でした。私の最初の Web サイトである Jiang...

ハイブリッド クラウドの災害復旧の課題を克服するためのヒントは何ですか?

クラウド コンピューティング環境を管理する場合は、「ホット スタンバイ」や「パイロット ライト」など...

ウェブサイトを構築することは人間になることと同じであり、あなたは常に最初の人になります

私は2009年にインターネットに触れ始めました。当時、彼氏は私に、あなたは文章を書くのが上手だから、...

クラウドコンピューティングの構成エラーによって生じる脆弱性に対処する方法

大規模なハッキングやエクスプロイトを準備する際、サイバー攻撃者は自身のスキルや狡猾さよりも、被害者の...

Baidu に必要なのはユーザー エクスペリエンスだけなので、これらの SEO 要素は依然として意味があるのでしょうか?

Baidu が発表した声明を見ると、ほとんどすべての重み調整と不正行為防止メカニズムはユーザー エク...

WeChat に公開する時間を選択するにはどうすればよいですか?

多くの企業のWeChatパブリックアカウント編集者にとって、WeChatに投稿する適切な時間を選択す...