简介
之前介绍过sigs.k8s.io controller-runtime系列之四 client分析sigs.k8s.io controller-runtime-client 。
本文主要介绍pkg/cache的源码分析。
目录结构
- cache_suite_test.go 注册测试GVK 校验k8s环境 获取client config
- 依赖ginkgo做集成测试,表示该文件夹内的测试例执行之前执行,
- BeforeSuite和AfterSuite,会在所有测试例执行之前和之后执行
- 如果BeforeSuite执行失败,则这个测试集都不会被执行
- cache.go
- Cache接口 提供了充当client.Reader的实例,帮助驱动基于事件处理的Kubernetes对象并添加对象的filed索引
// Cache充当一个在缓存中存储对象的client
type Cache interface {
client.Reader
// 用于Cache加载Informers并添加filed 索引(用于过滤和查找).
Informers
}
- Informers接口 为不同的gvk创建和fetch Informers
type Informers interface {
// 为给定对应一个kind和resource的obj获取或者构造一个informer
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
// 类似于 GetInformer,通过gvk获取
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
// 启动该cache中所包含的所有的informers
Start(ctx context.Context) error
// 等待所有的cache同步
WaitForCacheSync(ctx context.Context) bool
// 用于增加filed索引
client.FieldIndexer
}
- Informer接口
type Informer interface {
// 使用共享informer的重新同步周期将事件处理程序添加到共享informer。
// 单个handler的事件按顺序传递,但不同handler之间没有协调。
AddEventHandler(handler toolscache.ResourceEventHandler)
// 使用特定的重新同步周期将事件处理程序添加到共享informer。
// 单个handler的事件按顺序传递,但不同handler之间没有协调。
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration)
// 添加索引, 如果在store中已经存在,在调用该方法,则undefined
AddIndexers(indexers toolscache.Indexers) error
// informers下的store已经同步,则返回true
HasSynced() bool
}
- Options接口 在创建新的InformersMap的可选择的参数
type Options struct {
// 用于将对象映射到 GroupVersionKinds
Scheme *runtime.Scheme
// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources
// 用于将 GroupVersionKinds映射到Resource
Mapper meta.RESTMapper
// informer重新同步的基本频率,默认是 defaultResyncTime.
// informer之间的重新同步期间将增加 10% 的抖动,所以所有informer不会同时发送list请求。
Resync *time.Duration
// 将缓存的 ListWatch 限制为所需的命名空间
// 默认t watches all namespaces
Namespace string
}
- New函数 初始化并返回新的cache
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
// 下文会讲到,主要是内部实现的创建structured and unstructured objects的informer
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
// 检测opt,赋值默认属性
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// 没有设置Scheme,使用默认的
if opts.Scheme == nil {
opts.Scheme = scheme.Scheme
}
// 设置新的restmapper
if opts.Mapper == nil {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
if err != nil {
log.WithName("setup").Error(err, "Failed to get API Group-Resources")
return opts, fmt.Errorf("could not create RESTMapper from config")
}
}
// 没有设置同步周期 默认为10 hours
if opts.Resync == nil {
opts.Resync = &defaultResyncTime
}
return opts, nil
}
- multi_namespace_cache.go
- MultiNamespacedCacheBuilder函数 顾名思义,根据多个命名空间创建一个multicache(以一个map来包含多个cache)
// 将该cache的作用范围限定到一个命名空间list。
// 注意在大量命名空间中使用它时可能会遇到性能问题。
func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
caches := map[string]Cache{}
for _, ns := range namespaces {
opts.Namespace = ns
c, err := New(config, opts)
if err != nil {
return nil, err
}
caches[ns] = c
}
return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme}, nil
}
}
- internal/cache_reader.go 实现了client.Reader
- CacheReader结构体 内部使用cache.Index来实现了单一类型的client.CacheReader接口
type CacheReader struct {
// 修饰cache的索引器
indexer cache.Indexer
// resource的gvk
groupVersionKind schema.GroupVersionKind
// resource的范围 (namespaced or cluster-scoped).
scopeName apimeta.RESTScopeName
}
- Get函数 检查对象的索引器并在找到数据的副本写入参数out
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object) error {
if c.scopeName == apimeta.RESTScopeNameRoot {
key.Namespace = ""
}
// 获取obj store的key
storeKey := objectKeyToStoreKey(key)
// 从索引器缓存中查找对象
obj, exists, err := c.indexer.GetByKey(storeKey)
if err != nil {
return err
}
// 如果没有get,则抛异常
if !exists {
// Resource gets transformed into Kind in the error anyway, so this is fine
return errors.NewNotFound(schema.GroupResource{
Group: c.groupVersionKind.Group,
Resource: c.groupVersionKind.Kind,
}, key.Name)
}
// 验证结果是一个 runtime.Object
if _, isObj := obj.(runtime.Object); !isObj {
// 通常 这是不会发生的
return fmt.Errorf("cache contained %T, which is not an Object", obj)
}
// 深拷贝来避免污染缓存
obj = obj.(runtime.Object).DeepCopyObject()
// 拷贝在cache中的item的值到返回值中
outVal := reflect.ValueOf(out)
objVal := reflect.ValueOf(obj)
// 判断获取到的val和参数返回的type是否相同
if !objVal.Type().AssignableTo(outVal.Type()) {
return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
}
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
return nil
}
- List函数 列出索引器中的项目并将它们写出到参数out
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
var objs []interface{}
var err error
listOpts := client.ListOptions{}
listOpts.ApplyOptions(opts)
if listOpts.FieldSelector != nil {
// 目前只支持通过单一的FieldSelector,TODO通过组合多个索引、GetIndexers 等来支持更复杂的字段选择器
field, val, requiresExact := requiresExactMatch(listOpts.FieldSelector)
if !requiresExact {
return fmt.Errorf("non-exact field matches are not supported by the cache")
}
// 通过字段选择器列出所有对象.如果有且仅有一个namespace,则以其作为namespace的key. 否则使用 "all namespaces"作为namespace key.
// indexname的表示形式"field:" + field,indexkey的表示形式是ns + "/" + baseKey
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
} else if listOpts.Namespace != "" {
// indexname的表示形式namespace,indexkey的表示形式是ns
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
} else {
objs = c.indexer.List()
}
if err != nil {
return err
}
// 标签选择器
var labelSel labels.Selector
if listOpts.LabelSelector != nil {
labelSel = listOpts.LabelSelector
}
runtimeObjs := make([]runtime.Object, 0, len(objs))
// 遍历通过indexer获取的items,过滤掉不符合label的item
for _, item := range objs {
obj, isObj := item.(runtime.Object)
if !isObj {
return fmt.Errorf("cache contained %T, which is not an Object", obj)
}
meta, err := apimeta.Accessor(obj)
if err != nil {
return err
}
if labelSel != nil {
lbls := labels.Set(meta.GetLabels())
if !labelSel.Matches(lbls) {
continue
}
}
outObj := obj.DeepCopyObject()
outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
runtimeObjs = append(runtimeObjs, outObj)
}
return apimeta.SetList(out, runtimeObjs)
}
- internal包中的informers_map.go
- newSpecificInformersMap函数
// new一个新的specificInformersMap (和普通的InformersMap一样,但是没有实现WaitForCacheSync).
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
- MapEntry 结构体 包含了一个Informer的cached 数据
type MapEntry struct {
// cached informer
Informer cache.SharedIndexInformer
// CacheReader 包装了 Informer 并为单一类型实现了 CacheReader 接口
Reader CacheReader
}
- specificInformersMap结构体
// 创建和缓存基于runtime.Object和schema.GroupVersionKind的Informers.
// 使用基于给定生成方案构建的标准参数编解码器。
type specificInformersMap struct {
Scheme *runtime.Scheme
// 和apiserver交互使用
config *rest.Config
// GroupVersionKinds to Resources
mapper meta.RESTMapper
// 以map形式缓存 informers, key是 groupVersionKind
informersByGVK map[schema.GroupVersionKind]*MapEntry
// 用于创建一个新的 REST client
codecs serializer.CodecFactory
// 用于list and watch
paramCodec runtime.ParameterCodec
// 用于 stop informers
stop <-chan struct{}
resync time.Duration
// 锁 用于访问informersByGVK使用
mu sync.RWMutex
// 标识 informers是否已启动
started bool
// 在informer开始后,该chan将被关闭
startWait chan struct{}
// 创建listWatch的函数
createListWatcher createListWatcherFunc
// namespace 是所有 ListWatches 被限制到的命名空间
// 如果为空,则代表 all namespaces
namespace string
}
- Start函数 开启informersByGVK中的所有Informers,并设置属性started为true。该方法是阻塞的
func (ip *specificInformersMap) Start(ctx context.Context) {
func() {
ip.mu.Lock()
defer ip.mu.Unlock()
ip.stop = ctx.Done()
// 开启没有informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(ctx.Done())
}
ip.started = true
close(ip.startWait)
}()
<-ctx.Done()
}
- Get函数 如果gvk对应的specificInformer不存在,那么创建一个新的并添加到map中。返回map中对应gvk对应的informer
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// 存在即返回
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ip.started, ok
}()
if !ok {
var err error
// 不存在就添加一个新的
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return started, nil, err
}
}
// 如果已经开始了,但是还未同步结束
if started && !i.Informer.HasSynced() {
// 知道同步完成,才返回informer,如此不会从旧的缓存中读取
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}
return started, i, nil
}
- addInformerToMap函数 根据gvk添加informer到map中
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()
// 检查map中是否已经存在. 如果存在直接返回。
if i, ok := ip.informersByGVK[gvk]; ok {
return i, ip.started, nil
}
// 创建一个listWatch函数
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
return nil, false, err
}
// 创建一个IndexInformer
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, false, err
}
// 封装一个MapEntry实体,包含了Informer和Reader
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()},
}
ip.informersByGVK[gvk] = i
// 如果调用者已经start,那么开启新加入的informer
if ip.started {
go i.Informer.Run(ip.stop)
}
return i, ip.started, nil
}
- createStructuredListWatch函数 创建listWatch结构化对象的ListWatch函数
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// 获取映射obj和gvk
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
// 根据gvk获取对应的client
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}
// TODO: 这里可以使用用户自定义的额context
ctx := context.TODO()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
}, nil
}
- internal包中的deleg_map.go
- InformersMap结构体
type InformersMap struct {
structured *specificInformersMap
unstructured *specificInformersMap
metadata *specificInformersMap
Scheme *runtime.Scheme
}
- NewInformersMap函数
// 创建一个InformersMap
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}