当前位置: 首页>大数据>正文

Pilot配置分发机制

Pilot配置分发机制,第1张

Pilot负责网格中数据平面相关的配置信息的获取、生成及分发,它通过用户配置及服务注册表获取网格配置信息并将其转换为xDS接口的标准数据格式,而后经gPRC分发至相关的Envoy;

  • Service Registry:服务注册表中存储有相关平台上注册的各Service的相关信息,例如kubernetes的services等;
  • Config Storage:配置存储,例如Kubernetes的API Server,配置信息通常是由用户提供,对于Kubernetes来说,它们以kubernetes CRD格式提供并存储于API Server中;

事实上,基于适配器机制,Pilot还可以从Mesos,、Cloud Foundry和Consul等平台中获取服务信息,而必要时用户也可以开发适配器将其他提供服务发现的组件集成到Pilot中;

Pilot流量管理的相关组件

Pilot工作架构的相关组件包括:pilot-discovery、k8s apiserver、Envoy(istio-proxy)、pilot-agent,以及命令工具istioctl和kubectl;
事实上,Pilot项目自身的组件也是由工作于控制平面的polit-discovery和工作于数据平面的pilot-agent共同组成;

控制平面相关组件

pilot-discovery

即图中的Discovery Services,它主要完成以下功能

  • 从Service Registry中获取服务信息;
  • 从API Server中获取配置信息;
  • 将服务信息和配置信息转化为Envoy的配置格式,并通过xDS API完成分发;

Kubernetes API Server

配置存储系统,负责存储用户以kubernetes crd格式(例如VirtualService和DestinationRule等)提供的配置信息;

数据平面相关组件:Istio proxyv2镜像启动的容器中会运行pilot-agent和envoy两个进程;

pilot-agent

  • 基于k8s api server为envoy初始化出可用的boostrap配置文件并启动envoy;
  • 监控并管理envoy的运行状态,包括envoy出错时重启envoy,以及envoy配置变更后将其重载等;

envoy

envoy由pilot-agent进程基于生成bootstrap配置进行启动,而后根据配置中指定的pilot地址,通过xDS API获取动态配置信息;
Sidecar形式的Envoy通过流量拦截机制为应用程序实现入站和出站代理功能;

Pilot配置分发机制,第2张

pilot-discovery

Istio Pilot的代码分为Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用于在数据面负责Envoy的生命周期管理,Pilot-Discovery才是控制面进行流量管理的组件,本文将重点分析控制面部分,即Pilot-Discovery的代码。

工作机制

Istio源码分支: release-1.13

初始化

func newDiscoveryCommand() *cobra.Command {
    return &cobra.Command{
        Use:   "discovery",
        Short: "Start Istio proxy discovery service.",
        // ...
        RunE: func(c *cobra.Command, args []string) error {
            // ...
            // 创建xDS服务器
            discoveryServer, err := bootstrap.NewServer(serverArgs)
            if err != nil {
                return fmt.Errorf("failed to create discovery service: %v", err)
            }

            // 启动xDS服务器
            if err := discoveryServer.Start(stop); err != nil {
                return fmt.Errorf("failed to start discovery service: %v", err)
            }

            cmd.WaitSignal(stop)
            // Wait until we shut down. In theory this could block forever; in practice we will get
            // forcibly shut down after 30s in Kubernetes.
            discoveryServer.WaitUntilCompletion()
            return nil
        },
    }
}

其中bootstrap.NewServer创建服务发现服务器数据结构,

// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
    // ...
    ac := aggregate.NewController(aggregate.Options{
        MeshHolder: e,
    })
    e.ServiceDiscovery = ac

    s := &Server{
        // ...
        fileWatcher:             filewatcher.NewWatcher(),
        // ...
        server:                  server.New(),
        // ...
    }
    // ...
    
    // 创建DiscoveryServer
    // DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
    s.XDSServer = xds.NewDiscoveryServer(e, args.Plugins, args.PodName, args.Namespace, args.RegistryOptions.KubeOptions.ClusterAliases)

    // ...

    // 初始化各种控制器
    if err := s.initControllers(args); err != nil {
        return nil, err
    }

    // ...

    // This should be called only after controllers are initialized.
    // 初始化注册事件处理器,在其中注册了SE与配置变更时全量更新的处理器;
    // 添加了ServiceEntry类型资源的配置更新处理函数,即当外部资源配置发生更改时的响应处理流程。
    s.initRegistryEventHandlers()

    // 初始化发现服务,添加了xDS Server启动函数
    s.initDiscoveryService(args)

    s.initSDSServer()

    // ...

    return s, nil
}

xDS指的是Envoy里定义的一系列服务发现协议,如cds, eds, lds, rds等,

  • Overview见xDS configuration API overview
  • Proto见Envoy Proto。

配置控制器与服务控制器对各类资源的监控实际上实现了K8S的List/Watch机制

// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
    s.initMulticluster(args)
    // ...
    // 配置相关
    if err := s.initConfigController(args); err != nil {
        return fmt.Errorf("error initializing config controller: %v", err)
    }
    if err := s.initServiceControllers(args); err != nil {
        return fmt.Errorf("error initializing service controllers: %v", err)
    }
    return nil
}

初始化Config控制器

// initConfigController creates the config controller in the pilotConfig.
// 监控istio的各类资源,如VirtualService/DestinationRule/ServiceEntry/Gateway等的创建与更新
func (s *Server) initConfigController(args *PilotArgs) error {
    s.initStatusController(args, features.EnableStatus)
    meshConfig := s.environment.Mesh()
    if len(meshConfig.ConfigSources) > 0 {
        // Using MCP for config.
        if err := s.initConfigSources(args); err != nil {
            return err
        }
    } else if args.RegistryOptions.FileDir != "" {
        // Local files - should be added even if other options are specified
        store := memory.Make(collections.Pilot)
        configController := memory.NewController(store)

        err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
        // ...
    } else {
        err2 := s.initK8SConfigStore(args)
        // ...
    }

    // If running in ingress mode (requires k8s), wrap the config controller.
    if hasKubeRegistry(args.RegistryOptions.Registries) && meshConfig.IngressControllerMode != meshconfig.MeshConfig_OFF {
        // ...
    }

    // Wrap the config controller with a cache.
    aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
    // ...
    s.configController = aggregateConfigController

    // Create the config store.
    s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

    // Defer starting the controller until after the service is created.
    s.addStartFunc(func(stop <-chan struct{}) error {
        go s.configController.Run(stop)
        return nil
    })

    return nil
}

初始化Service控制器

Pilot中跟服务发现的相关的逻辑在ServiceController中,处理来自例如kubernetes注册中心的服务变更信息,用于监控k8s中各类服务资源,如Pod/Service等的创建与更新;

// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
    serviceControllers := s.ServiceController()

    s.serviceEntryStore = serviceentry.NewServiceDiscovery(
        s.configController, s.environment.IstioConfigStore, s.XDSServer,
        serviceentry.WithClusterID(s.clusterID),
    )
    serviceControllers.AddRegistry(s.serviceEntryStore)

    registered := make(map[provider.ID]bool)
    for _, r := range args.RegistryOptions.Registries {
        serviceRegistry := provider.ID(r)
        if _, exists := registered[serviceRegistry]; exists {
            log.Warnf("%s registry specified multiple times.", r)
            continue
        }
        registered[serviceRegistry] = true
        log.Infof("Adding %s registry adapter", serviceRegistry)
        switch serviceRegistry {
        case provider.Kubernetes:
            // 初始化K8S服务控制器
            if err := s.initKubeRegistry(args); err != nil {
                return err
            }
        case provider.Mock:
            s.initMockRegistry()
        default:
            return fmt.Errorf("service registry %s is not supported", r)
        }
    }

    // Defer running of the service controllers.
    s.addStartFunc(func(stop <-chan struct{}) error {
        go serviceControllers.Run(stop)
        return nil
    })

    return nil
}

controller初始化

NewController方法里面首先是初始化Controller,然后获取informer和lister后分别注册Namespaces Handler、Services Handler、Nodes Handler、Pods Handler。

核心功能就是监听k8s相关资源(Namespaces、Service、Pod、Node)的更新事件,执行相应的事件处理回调函数。

// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see multicluster.Controller).
func NewController(kubeClient kubelib.Client, options Options) *Controller {
    c := &Controller{
        opts:                       options,
        client:                     kubeClient,
        // 控制器任务队列
        queue:                      queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
        // ...
        multinetwork: initMultinetwork(),
    }
    // ...
    c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer()
    c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister()
    if c.opts.SystemNamespace != "" {
        // ...
        c.registerHandlers(nsInformer, "Namespaces", c.onSystemNamespaceEvent, nil)
    }
    
    // ...
    c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)

    switch options.EndpointMode {
    case EndpointsOnly:
        c.endpoints = newEndpointsController(c)
    case EndpointSliceOnly:
        c.endpoints = newEndpointSliceController(c)
    }

    // ...
    c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)

    // ...
    c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)

    c.exports = newServiceExportCache(c)
    c.imports = newServiceImportCache(c)

    return c
}

上边的Controller实现了接口:

type Controller interface {
    // AppendServiceHandler notifies about changes to the service catalog.
    AppendServiceHandler(f func(*Service, Event))

    // AppendWorkloadHandler notifies about changes to workloads. This differs from InstanceHandler,
    // which deals with service instances (the result of a merge of Service and Workload)
    AppendWorkloadHandler(f func(*WorkloadInstance, Event))

    // Run until a signal is received
    Run(stop <-chan struct{})

    // HasSynced returns true after initial cache synchronization is complete
    HasSynced() bool
}

ServiceController为四种资源分别创建了一个监听器,用于监听K8s的资源更新,并注册EventHandler。

Namespace

func (c *Controller) onSystemNamespaceEvent(obj interface{}, ev model.Event) error {
    // ...
    // network changed, rarely happen
    if oldDefaultNetwork != c.network {
        // refresh pods/endpoints/services
        c.onDefaultNetworkChange()
    }
    return nil
}

Service

Service事件处理器会将根据事件的类型更新缓存,然后调用serviceHandlers的事件处理器进行回调。

func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
    svc, err := convertToService(curr)
    if err != nil {
        log.Errorf(err)
        return nil
    }

    log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)

    // Create the standard (cluster.local) service.
    svcConv := kube.ConvertService(*svc, c.opts.DomainSuffix, c.Cluster())
    switch event {
    case model.EventDelete:
        c.deleteService(svcConv)
    default:
        c.addOrUpdateService(svc, svcConv, event, false)
    }

    return nil
}

Node

func (c *Controller) onNodeEvent(obj interface{}, event model.Event) error {
    // ...
    // update all related services
    if updatedNeeded && c.updateServiceNodePortAddresses() {
        c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
            Full:   true,
            Reason: []model.TriggerReason{model.ServiceUpdate},
        })
    }
    return nil
}

Pod

// onEvent updates the IP-based index (pc.podsByIP).
func (pc *PodCache) onEvent(curr interface{}, ev model.Event) error {
    // ...
    switch ev {
    case model.EventAdd:
        // can happen when istiod just starts
        if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
            return nil
        } else if shouldPodBeInEndpoints(pod) {
            pc.update(ip, key)
        } else {
            return nil
        }
    case model.EventUpdate:
        if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
            // delete only if this pod was in the cache
            pc.deleteIP(ip, key)
            ev = model.EventDelete
        } else if shouldPodBeInEndpoints(pod) {
            pc.update(ip, key)
        } else {
            return nil
        }
    case model.EventDelete:
        // delete only if this pod was in the cache,
        // in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
        if !pc.deleteIP(ip, key) {
            return nil
        }
    }
    pc.notifyWorkloadHandlers(pod, ev)
    return nil
}

Pilot Discovery各组件启动流程

Pilot配置分发机制,第3张

DiscoveryServer接收Envoy的gRPC连接请求流程

Pilot配置分发机制,第4张

Config变化后向Envoy推送更新的流程

Pilot配置分发机制,第5张

总结

Pilot-Discovery服务发现是通过k8s的Informer和Lister来注册监听Namespace、Service、nodes、pods等资源的更新事件,然后通过事件驱动模型执行回调函数,再调用xDS的ConfigUpdate来执行异步更新配置的操作


https://www.xamrdz.com/bigdata/7mk1994058.html

相关文章: