当前位置: 首页>编程语言>正文

k8s设置的memory 是jvm内存吗 k8s informer cache

简介

之前介绍过sigs.k8s.io controller-runtime系列之四 client分析sigs.k8s.io controller-runtime-client 。
本文主要介绍pkg/cache的源码分析。

目录结构

  1. cache_suite_test.go 注册测试GVK 校验k8s环境 获取client config
  • 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
  • BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
  • 如果BeforeSuite执行失败,则这个测试集都不会被执行
  1. cache.go
  • Cache接口 提供了充当client.Reader的实例,帮助驱动基于事件处理的Kubernetes对象并添加对象的filed索引
// Cache充当一个在缓存中存储对象的client
type Cache interface {
	client.Reader

	// 用于Cache加载Informers并添加filed 索引(用于过滤和查找).
	Informers
}
  • Informers接口 为不同的gvk创建和fetch Informers
type Informers interface {
    // 为给定对应一个kind和resource的obj获取或者构造一个informer
	GetInformer(ctx context.Context, obj client.Object) (Informer, error)

	// 类似于 GetInformer,通过gvk获取
	GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)

	// 启动该cache中所包含的所有的informers
	Start(ctx context.Context) error

    // 等待所有的cache同步
	WaitForCacheSync(ctx context.Context) bool

    // 用于增加filed索引
	client.FieldIndexer
}
  • Informer接口
type Informer interface {
    // 使用共享informer的重新同步周期将事件处理程序添加到共享informer。
    // 单个handler的事件按顺序传递,但不同handler之间没有协调。
	AddEventHandler(handler toolscache.ResourceEventHandler)

    // 使用特定的重新同步周期将事件处理程序添加到共享informer。
	// 单个handler的事件按顺序传递,但不同handler之间没有协调。
	AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration)

    // 添加索引, 如果在store中已经存在,在调用该方法,则undefined
	AddIndexers(indexers toolscache.Indexers) error

    // informers下的store已经同步,则返回true
	HasSynced() bool
}
  • Options接口 在创建新的InformersMap的可选择的参数
type Options struct {
 // 用于将对象映射到 GroupVersionKinds
	Scheme *runtime.Scheme

	// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources
	// 用于将 GroupVersionKinds映射到Resource
	Mapper meta.RESTMapper

	// informer重新同步的基本频率,默认是 defaultResyncTime.
 // informer之间的重新同步期间将增加 10% 的抖动,所以所有informer不会同时发送list请求。
	Resync *time.Duration

	// 将缓存的 ListWatch 限制为所需的命名空间
	// 默认t watches all namespaces
	Namespace string
}
  • New函数 初始化并返回新的cache
func New(config *rest.Config, opts Options) (Cache, error) {
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
    // 下文会讲到,主要是内部实现的创建structured and unstructured objects的informer
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
	return &informerCache{InformersMap: im}, nil
}

// 检测opt,赋值默认属性
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
	// 没有设置Scheme,使用默认的
	if opts.Scheme == nil {
		opts.Scheme = scheme.Scheme
	}

	// 设置新的restmapper
	if opts.Mapper == nil {
		var err error
		opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
		if err != nil {
			log.WithName("setup").Error(err, "Failed to get API Group-Resources")
			return opts, fmt.Errorf("could not create RESTMapper from config")
		}
	}

	// 没有设置同步周期  默认为10 hours
	if opts.Resync == nil {
		opts.Resync = &defaultResyncTime
	}
	return opts, nil
}
  1. multi_namespace_cache.go
  • MultiNamespacedCacheBuilder函数 顾名思义,根据多个命名空间创建一个multicache(以一个map来包含多个cache)
// 将该cache的作用范围限定到一个命名空间list。
// 注意在大量命名空间中使用它时可能会遇到性能问题。
func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
	return func(config *rest.Config, opts Options) (Cache, error) {
		opts, err := defaultOpts(config, opts)
		if err != nil {
			return nil, err
		}
		caches := map[string]Cache{}
		for _, ns := range namespaces {
			opts.Namespace = ns
			c, err := New(config, opts)
			if err != nil {
				return nil, err
			}
			caches[ns] = c
		}
		return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme}, nil
	}
}
  1. internal/cache_reader.go 实现了client.Reader
  • CacheReader结构体 内部使用cache.Index来实现了单一类型的client.CacheReader接口
type CacheReader struct {
	// 修饰cache的索引器
	indexer cache.Indexer

	// resource的gvk
	groupVersionKind schema.GroupVersionKind

	// resource的范围 (namespaced or cluster-scoped).
	scopeName apimeta.RESTScopeName
}
  • Get函数 检查对象的索引器并在找到数据的副本写入参数out
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object) error {
	if c.scopeName == apimeta.RESTScopeNameRoot {
		key.Namespace = ""
	}
 // 获取obj store的key
	storeKey := objectKeyToStoreKey(key)

	// 从索引器缓存中查找对象
	obj, exists, err := c.indexer.GetByKey(storeKey)
	if err != nil {
		return err
	}

	// 如果没有get,则抛异常
	if !exists {
		// Resource gets transformed into Kind in the error anyway, so this is fine
		return errors.NewNotFound(schema.GroupResource{
			Group:    c.groupVersionKind.Group,
			Resource: c.groupVersionKind.Kind,
		}, key.Name)
	}

	// 验证结果是一个 runtime.Object
	if _, isObj := obj.(runtime.Object); !isObj {
		// 通常 这是不会发生的
		return fmt.Errorf("cache contained %T, which is not an Object", obj)
	}

	// 深拷贝来避免污染缓存
	obj = obj.(runtime.Object).DeepCopyObject()

	// 拷贝在cache中的item的值到返回值中
	outVal := reflect.ValueOf(out)
	objVal := reflect.ValueOf(obj)
 // 判断获取到的val和参数返回的type是否相同
	if !objVal.Type().AssignableTo(outVal.Type()) {
		return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
	}
	reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
	out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)

	return nil
}
  • List函数 列出索引器中的项目并将它们写出到参数out
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
	var objs []interface{}
	var err error

	listOpts := client.ListOptions{}
	listOpts.ApplyOptions(opts)

	if listOpts.FieldSelector != nil {
		// 目前只支持通过单一的FieldSelector,TODO通过组合多个索引、GetIndexers 等来支持更复杂的字段选择器
		field, val, requiresExact := requiresExactMatch(listOpts.FieldSelector)
		if !requiresExact {
			return fmt.Errorf("non-exact field matches are not supported by the cache")
		}
		// 通过字段选择器列出所有对象.如果有且仅有一个namespace,则以其作为namespace的key.  否则使用 "all namespaces"作为namespace key.
     // indexname的表示形式"field:" + field,indexkey的表示形式是ns + "/" + baseKey
		objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
	} else if listOpts.Namespace != "" {
     // indexname的表示形式namespace,indexkey的表示形式是ns
		objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
	} else {
		objs = c.indexer.List()
	}
	if err != nil {
		return err
	}

 // 标签选择器
	var labelSel labels.Selector
	if listOpts.LabelSelector != nil {
		labelSel = listOpts.LabelSelector
	}

	runtimeObjs := make([]runtime.Object, 0, len(objs))
 // 遍历通过indexer获取的items,过滤掉不符合label的item
	for _, item := range objs {
		obj, isObj := item.(runtime.Object)
		if !isObj {
			return fmt.Errorf("cache contained %T, which is not an Object", obj)
		}
		meta, err := apimeta.Accessor(obj)
		if err != nil {
			return err
		}
		if labelSel != nil {
			lbls := labels.Set(meta.GetLabels())
			if !labelSel.Matches(lbls) {
				continue
			}
		}

		outObj := obj.DeepCopyObject()
		outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
		runtimeObjs = append(runtimeObjs, outObj)
	}
	return apimeta.SetList(out, runtimeObjs)
}
  1. internal包中的informers_map.go
  • newSpecificInformersMap函数
// new一个新的specificInformersMap (和普通的InformersMap一样,但是没有实现WaitForCacheSync).
func newSpecificInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string,
	createListWatcher createListWatcherFunc) *specificInformersMap {
	ip := &specificInformersMap{
		config:            config,
		Scheme:            scheme,
		mapper:            mapper,
		informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
		codecs:            serializer.NewCodecFactory(scheme),
		paramCodec:        runtime.NewParameterCodec(scheme),
		resync:            resync,
		startWait:         make(chan struct{}),
		createListWatcher: createListWatcher,
		namespace:         namespace,
	}
	return ip
}
  • MapEntry 结构体 包含了一个Informer的cached 数据
type MapEntry struct {
	// cached informer
	Informer cache.SharedIndexInformer

	// CacheReader 包装了 Informer 并为单一类型实现了 CacheReader 接口
	Reader CacheReader
}
  • specificInformersMap结构体
// 创建和缓存基于runtime.Object和schema.GroupVersionKind的Informers.
// 使用基于给定生成方案构建的标准参数编解码器。
type specificInformersMap struct {

	Scheme *runtime.Scheme

	// 和apiserver交互使用
	config *rest.Config

	// GroupVersionKinds to Resources
	mapper meta.RESTMapper

	// 以map形式缓存 informers, key是 groupVersionKind
	informersByGVK map[schema.GroupVersionKind]*MapEntry

	// 用于创建一个新的 REST client
	codecs serializer.CodecFactory

	// 用于list and watch
	paramCodec runtime.ParameterCodec

	// 用于 stop informers
	stop <-chan struct{}

	resync time.Duration

	// 锁 用于访问informersByGVK使用
	mu sync.RWMutex

	// 标识 informers是否已启动
	started bool

	// 在informer开始后,该chan将被关闭
	startWait chan struct{}

	// 创建listWatch的函数
	createListWatcher createListWatcherFunc

	// namespace 是所有 ListWatches 被限制到的命名空间
	// 如果为空,则代表 all namespaces
	namespace string
}
  • Start函数 开启informersByGVK中的所有Informers,并设置属性started为true。该方法是阻塞的
func (ip *specificInformersMap) Start(ctx context.Context) {
	func() {
		ip.mu.Lock()
		defer ip.mu.Unlock()

		ip.stop = ctx.Done()

		// 开启没有informer
		for _, informer := range ip.informersByGVK {
			go informer.Informer.Run(ctx.Done())
		}

		ip.started = true
		close(ip.startWait)
	}()
	<-ctx.Done()
}
  • Get函数 如果gvk对应的specificInformer不存在,那么创建一个新的并添加到map中。返回map中对应gvk对应的informer
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
	// 存在即返回
	i, started, ok := func() (*MapEntry, bool, bool) {
		ip.mu.RLock()
		defer ip.mu.RUnlock()
		i, ok := ip.informersByGVK[gvk]
		return i, ip.started, ok
	}()

	if !ok {
		var err error
       // 不存在就添加一个新的
		if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
			return started, nil, err
		}
	}

   // 如果已经开始了,但是还未同步结束
	if started && !i.Informer.HasSynced() {
		// 知道同步完成,才返回informer,如此不会从旧的缓存中读取
		if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
			return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
		}
	}

	return started, i, nil
}
  • addInformerToMap函数 根据gvk添加informer到map中
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
	ip.mu.Lock()
	defer ip.mu.Unlock()

	// 检查map中是否已经存在.  如果存在直接返回。
	if i, ok := ip.informersByGVK[gvk]; ok {
		return i, ip.started, nil
	}

	// 创建一个listWatch函数
	var lw *cache.ListWatch
	lw, err := ip.createListWatcher(gvk, ip)
	if err != nil {
		return nil, false, err
	}
  // 创建一个IndexInformer
	ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
		cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
	})
	rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, false, err
	}
  // 封装一个MapEntry实体,包含了Informer和Reader
	i := &MapEntry{
		Informer: ni,
		Reader:   CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
	}
	ip.informersByGVK[gvk] = i

	// 如果调用者已经start,那么开启新加入的informer
	if ip.started {
		go i.Informer.Run(ip.stop)
	}
	return i, ip.started, nil
}
  • createStructuredListWatch函数 创建listWatch结构化对象的ListWatch函数
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
	// 获取映射obj和gvk
	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, err
	}
  // 根据gvk获取对应的client
	client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
	if err != nil {
		return nil, err
	}
	listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
	listObj, err := ip.Scheme.New(listGVK)
	if err != nil {
		return nil, err
	}

	// TODO: 这里可以使用用户自定义的额context
	ctx := context.TODO()

	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			res := listObj.DeepCopyObject()
			isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
			return res, err
		},
		// Setup the watch function
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			// Watch needs to be set to true separately
			opts.Watch = true
			isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
		},
	}, nil
}
  1. internal包中的deleg_map.go
  • InformersMap结构体
type InformersMap struct {

  structured   *specificInformersMap
  unstructured *specificInformersMap
  metadata     *specificInformersMap

  Scheme *runtime.Scheme
}
  • NewInformersMap函数
// 创建一个InformersMap
func NewInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string) *InformersMap {

	return &InformersMap{
		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace),
		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace),

		Scheme: scheme,
	}
}



https://www.xamrdz.com/lan/5bj1962500.html

相关文章: