Pilot负责网格中数据平面相关的配置信息的获取、生成及分发,它通过用户配置及服务注册表获取网格配置信息并将其转换为xDS接口的标准数据格式,而后经gPRC分发至相关的Envoy;
- Service Registry:服务注册表中存储有相关平台上注册的各Service的相关信息,例如kubernetes的services等;
- Config Storage:配置存储,例如Kubernetes的API Server,配置信息通常是由用户提供,对于Kubernetes来说,它们以kubernetes CRD格式提供并存储于API Server中;
事实上,基于适配器机制,Pilot还可以从Mesos,、Cloud Foundry和Consul等平台中获取服务信息,而必要时用户也可以开发适配器将其他提供服务发现的组件集成到Pilot中;
Pilot流量管理的相关组件
Pilot工作架构的相关组件包括:pilot-discovery、k8s apiserver、Envoy(istio-proxy)、pilot-agent,以及命令工具istioctl和kubectl;
事实上,Pilot项目自身的组件也是由工作于控制平面的polit-discovery和工作于数据平面的pilot-agent共同组成;
控制平面相关组件
pilot-discovery
即图中的Discovery Services,它主要完成以下功能
- 从Service Registry中获取服务信息;
- 从API Server中获取配置信息;
- 将服务信息和配置信息转化为Envoy的配置格式,并通过xDS API完成分发;
Kubernetes API Server
配置存储系统,负责存储用户以kubernetes crd格式(例如VirtualService和DestinationRule等)提供的配置信息;
数据平面相关组件:Istio proxyv2镜像启动的容器中会运行pilot-agent和envoy两个进程;
pilot-agent
- 基于k8s api server为envoy初始化出可用的boostrap配置文件并启动envoy;
- 监控并管理envoy的运行状态,包括envoy出错时重启envoy,以及envoy配置变更后将其重载等;
envoy
envoy由pilot-agent进程基于生成bootstrap配置进行启动,而后根据配置中指定的pilot地址,通过xDS API获取动态配置信息;
Sidecar形式的Envoy通过流量拦截机制为应用程序实现入站和出站代理功能;
pilot-discovery
Istio Pilot的代码分为Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用于在数据面负责Envoy的生命周期管理,Pilot-Discovery才是控制面进行流量管理的组件,本文将重点分析控制面部分,即Pilot-Discovery的代码。
工作机制
Istio源码分支: release-1.13
初始化
func newDiscoveryCommand() *cobra.Command {
return &cobra.Command{
Use: "discovery",
Short: "Start Istio proxy discovery service.",
// ...
RunE: func(c *cobra.Command, args []string) error {
// ...
// 创建xDS服务器
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
// 启动xDS服务器
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}
cmd.WaitSignal(stop)
// Wait until we shut down. In theory this could block forever; in practice we will get
// forcibly shut down after 30s in Kubernetes.
discoveryServer.WaitUntilCompletion()
return nil
},
}
}
其中bootstrap.NewServer创建服务发现服务器数据结构,
// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
// ...
ac := aggregate.NewController(aggregate.Options{
MeshHolder: e,
})
e.ServiceDiscovery = ac
s := &Server{
// ...
fileWatcher: filewatcher.NewWatcher(),
// ...
server: server.New(),
// ...
}
// ...
// 创建DiscoveryServer
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
s.XDSServer = xds.NewDiscoveryServer(e, args.Plugins, args.PodName, args.Namespace, args.RegistryOptions.KubeOptions.ClusterAliases)
// ...
// 初始化各种控制器
if err := s.initControllers(args); err != nil {
return nil, err
}
// ...
// This should be called only after controllers are initialized.
// 初始化注册事件处理器,在其中注册了SE与配置变更时全量更新的处理器;
// 添加了ServiceEntry类型资源的配置更新处理函数,即当外部资源配置发生更改时的响应处理流程。
s.initRegistryEventHandlers()
// 初始化发现服务,添加了xDS Server启动函数
s.initDiscoveryService(args)
s.initSDSServer()
// ...
return s, nil
}
xDS指的是Envoy里定义的一系列服务发现协议,如cds, eds, lds, rds等,
- Overview见xDS configuration API overview
- Proto见Envoy Proto。
配置控制器与服务控制器对各类资源的监控实际上实现了K8S的List/Watch机制
// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
s.initMulticluster(args)
// ...
// 配置相关
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
if err := s.initServiceControllers(args); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}
初始化Config控制器
// initConfigController creates the config controller in the pilotConfig.
// 监控istio的各类资源,如VirtualService/DestinationRule/ServiceEntry/Gateway等的创建与更新
func (s *Server) initConfigController(args *PilotArgs) error {
s.initStatusController(args, features.EnableStatus)
meshConfig := s.environment.Mesh()
if len(meshConfig.ConfigSources) > 0 {
// Using MCP for config.
if err := s.initConfigSources(args); err != nil {
return err
}
} else if args.RegistryOptions.FileDir != "" {
// Local files - should be added even if other options are specified
store := memory.Make(collections.Pilot)
configController := memory.NewController(store)
err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
// ...
} else {
err2 := s.initK8SConfigStore(args)
// ...
}
// If running in ingress mode (requires k8s), wrap the config controller.
if hasKubeRegistry(args.RegistryOptions.Registries) && meshConfig.IngressControllerMode != meshconfig.MeshConfig_OFF {
// ...
}
// Wrap the config controller with a cache.
aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
// ...
s.configController = aggregateConfigController
// Create the config store.
s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)
// Defer starting the controller until after the service is created.
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop)
return nil
})
return nil
}
初始化Service控制器
Pilot中跟服务发现的相关的逻辑在ServiceController中,处理来自例如kubernetes注册中心的服务变更信息,用于监控k8s中各类服务资源,如Pod/Service等的创建与更新;
// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers := s.ServiceController()
s.serviceEntryStore = serviceentry.NewServiceDiscovery(
s.configController, s.environment.IstioConfigStore, s.XDSServer,
serviceentry.WithClusterID(s.clusterID),
)
serviceControllers.AddRegistry(s.serviceEntryStore)
registered := make(map[provider.ID]bool)
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
if _, exists := registered[serviceRegistry]; exists {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered[serviceRegistry] = true
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case provider.Kubernetes:
// 初始化K8S服务控制器
if err := s.initKubeRegistry(args); err != nil {
return err
}
case provider.Mock:
s.initMockRegistry()
default:
return fmt.Errorf("service registry %s is not supported", r)
}
}
// Defer running of the service controllers.
s.addStartFunc(func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})
return nil
}
controller初始化
NewController方法里面首先是初始化Controller,然后获取informer和lister后分别注册Namespaces Handler、Services Handler、Nodes Handler、Pods Handler。
核心功能就是监听k8s相关资源(Namespaces、Service、Pod、Node)的更新事件,执行相应的事件处理回调函数。
// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see multicluster.Controller).
func NewController(kubeClient kubelib.Client, options Options) *Controller {
c := &Controller{
opts: options,
client: kubeClient,
// 控制器任务队列
queue: queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
// ...
multinetwork: initMultinetwork(),
}
// ...
c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer()
c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister()
if c.opts.SystemNamespace != "" {
// ...
c.registerHandlers(nsInformer, "Namespaces", c.onSystemNamespaceEvent, nil)
}
// ...
c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)
switch options.EndpointMode {
case EndpointsOnly:
c.endpoints = newEndpointsController(c)
case EndpointSliceOnly:
c.endpoints = newEndpointSliceController(c)
}
// ...
c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)
// ...
c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)
c.exports = newServiceExportCache(c)
c.imports = newServiceImportCache(c)
return c
}
上边的Controller实现了接口:
type Controller interface {
// AppendServiceHandler notifies about changes to the service catalog.
AppendServiceHandler(f func(*Service, Event))
// AppendWorkloadHandler notifies about changes to workloads. This differs from InstanceHandler,
// which deals with service instances (the result of a merge of Service and Workload)
AppendWorkloadHandler(f func(*WorkloadInstance, Event))
// Run until a signal is received
Run(stop <-chan struct{})
// HasSynced returns true after initial cache synchronization is complete
HasSynced() bool
}
ServiceController为四种资源分别创建了一个监听器,用于监听K8s的资源更新,并注册EventHandler。
Namespace
func (c *Controller) onSystemNamespaceEvent(obj interface{}, ev model.Event) error {
// ...
// network changed, rarely happen
if oldDefaultNetwork != c.network {
// refresh pods/endpoints/services
c.onDefaultNetworkChange()
}
return nil
}
Service
Service事件处理器会将根据事件的类型更新缓存,然后调用serviceHandlers的事件处理器进行回调。
func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
svc, err := convertToService(curr)
if err != nil {
log.Errorf(err)
return nil
}
log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
// Create the standard (cluster.local) service.
svcConv := kube.ConvertService(*svc, c.opts.DomainSuffix, c.Cluster())
switch event {
case model.EventDelete:
c.deleteService(svcConv)
default:
c.addOrUpdateService(svc, svcConv, event, false)
}
return nil
}
Node
func (c *Controller) onNodeEvent(obj interface{}, event model.Event) error {
// ...
// update all related services
if updatedNeeded && c.updateServiceNodePortAddresses() {
c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: []model.TriggerReason{model.ServiceUpdate},
})
}
return nil
}
Pod
// onEvent updates the IP-based index (pc.podsByIP).
func (pc *PodCache) onEvent(curr interface{}, ev model.Event) error {
// ...
switch ev {
case model.EventAdd:
// can happen when istiod just starts
if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
return nil
} else if shouldPodBeInEndpoints(pod) {
pc.update(ip, key)
} else {
return nil
}
case model.EventUpdate:
if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
// delete only if this pod was in the cache
pc.deleteIP(ip, key)
ev = model.EventDelete
} else if shouldPodBeInEndpoints(pod) {
pc.update(ip, key)
} else {
return nil
}
case model.EventDelete:
// delete only if this pod was in the cache,
// in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
if !pc.deleteIP(ip, key) {
return nil
}
}
pc.notifyWorkloadHandlers(pod, ev)
return nil
}
Pilot Discovery各组件启动流程
DiscoveryServer接收Envoy的gRPC连接请求流程
Config变化后向Envoy推送更新的流程
总结
Pilot-Discovery服务发现是通过k8s的Informer和Lister来注册监听Namespace、Service、nodes、pods等资源的更新事件,然后通过事件驱动模型执行回调函数,再调用xDS的ConfigUpdate来执行异步更新配置的操作