11月23, 2017

Kubelet源码分析

本篇文章主要介绍kubelet服务如何启动及创建pod的流程,也对想要看kubelet源码的同学作为一个参考。

(kubelet版本: 1.7.4)

Kubelet介绍

在Kubernetes集群中,在每个Node节点上都会启动一个kubelet服务进程。该进程用于处理Master节点下发到本节点的任务,管理Pod及Pod中的容器。每个Kubelet进程会在APIServer上注册节点自身信息,定期向Master节点汇报节点资源的使用情况,并通过cAdvise监控容器和节点资源。

Kubelet功能

  • Pod管理
  • 容器的健康检测
  • 容器监控

Kubelet 代码结构

systemd
  • cmd/kubelet/kubelet.go kubelet服务的入口(main)。
  • cmd/kubelet/app/server.go 主要负责校验参数,创建和api-server交互的client及对运行kubelet权限检测,启动Kubelet等等。

除了入口,kubelet其它的主要功能实现在pkg/kubelet下。这里就不一一介绍了,在下面的时序图中,会标记pkg中用到了哪些文件,并主要实现了什么功能。

Kubelet服务启动流程

systemd

上面的时序图就是整个kubelet的启动流程。

  • validateConfig 主要对kubelet的NewKubeletServer结构体进行参数校验。
  • CreateAPIServerClientConfig 创建与控制节点api-server交互的client(kubeClient, eventClient)。
  • checkPermission 对运行kubelet进程的用户的权限验证(是否为root用户)。

RunKubelet中主要做CreateAndInitKubeletstartKubelet两件事。

func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, crOptions *options.ContainerRuntimeOptions, standaloneMode bool, hostnameOverride, nodeIP, providerID string) (k kubelet.KubeletBootstrap, err error) {
    // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
    // up into "per source" synchronizations

    k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, standaloneMode, hostnameOverride, nodeIP, providerID)
    if err != nil {
        return nil, err
    }

    k.BirthCry()

    k.StartGarbageCollection()

    return k, nil
}
  • NewMainKubelet 实例化一个kubelet对象,并对kubelet内部各个component进行初始化工作,如:

    • makePodSourceConfig pod元数据的来源(FILE, URL, api-server).
    • diskSpaceManager 磁盘空间的管理
    • secretManager secret资源的管理
    • configMapManager configMap资源的管理
    • InitNetworkPlugin 网络插件的初始化
    • PodCache Pod缓存的管理
    • PodManager 对pod的管理(如: 增删改等)
    • ContainerRuntime 容器运行时的选择(docker or rkt)
    • containerGC 容器的垃圾回收
    • imageManager 镜像的管路
    • statusManager pod状态的管理
    • probeManager 容器健康检测
    • gpuManager 对GPU的支持
  • BirthCry 通知api-server服务kubelet启动了
  • StartGarbageCollection 开启垃圾回收服务

当之前所有的预处理工作处理完成之后,准备启动我们的kubelet服务startKubelet

func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {
    // start the kubelet
    go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

    // start the kubelet server
    if kubeCfg.EnableServer {
        go wait.Until(func() {
            k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
        }, 0, wait.NeverStop)
    }
    if kubeCfg.ReadOnlyPort > 0 {
        go wait.Until(func() {
            k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
        }, 0, wait.NeverStop)
    }
}

startKubelet内的第一个goroutine负责启动kubelet,而后面则是创建一个kubelet http server。通过该server获取pod及node的相关信息。

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    if kl.kubeClient == nil {
        glog.Warning("No api server defined - no node status update will be sent.")
    }

    if err := kl.initializeModules(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
        glog.Error(err)
        kl.runtimeState.setInitError(err)
    }

    // Start volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

    if kl.kubeClient != nil {
        // Start syncing node status immediately, this may set up things the runtime needs to run.
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    }
    go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // Start loop to sync iptables util rules
    if kl.makeIPTablesUtilChains {
        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
    }

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // Start gorouting responsible for checking limits in resolv.conf
    if kl.resolverConfig != "" {
        go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
    }

    // Start component sync loops.
    kl.statusManager.Start()
    kl.probeManager.Start()

    // Start the pod lifecycle event generator.
    kl.pleg.Start()
    kl.syncLoop(updates, kl)
}

上面的这段代码就是对上面介绍的各个component组件的启动,每个组件都是以goroutine运行的。这里就不细说了。创建pod的入口在syncLoop这里开始。

Pod的创建流程

systemd

syncLoop 是kubelet的主循环方法,它从不同的管道(FILE,URL, API-SERVER)监听到pod的变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的函数,保证Pod处于期望的状态。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    glog.Info("Starting kubelet main sync loop.")
    // The resyncTicker wakes up kubelet to checks if there are any pod workers
    // that need to be sync'd. A one-second period is sufficient because the
    // sync interval is defaulted to 10s.
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    plegCh := kl.pleg.Watch()
    for {
        if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
            glog.Infof("skipping pod synchronization - %v", rs)
            time.Sleep(5 * time.Second)
            continue
        }

        kl.syncLoopMonitor.Store(kl.clock.Now())
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

kl.syncLoopIteration这个方法会对多个管道进行遍历,如果有pod动作,则会调用相应的Handler。下面是对应的Interface

// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
    HandlePodAdditions(pods []*v1.Pod)
    HandlePodUpdates(pods []*v1.Pod)
    HandlePodRemoves(pods []*v1.Pod)
    HandlePodReconcile(pods []*v1.Pod)
    HandlePodSyncs(pods []*v1.Pod)
    HandlePodCleanups() error
}

我们以创建Pod为例,它会调用对应的HandlePodAdditionsHandler进行处理。HandlePodAdditions做的任务就是通过canAdmitPod方法校验Pod能否在该计算节点创建(如:磁盘空间)。之后把创建Pod的事交给dispatchWorkdispatchWork主要工作就是把接收到的参数封装成UpdatePodOptions,调用 UpdatePod 方法.

syncPod是创建Pod的核心逻辑。其中有几个主要的方法:

  • computePodContainerChanges 根据最新拿到的pod配置与当前运行的容器配置进行对比,计算其中的变化(一个具体的hash值),得到需要重启的容器的信息。
  • createPodSandBox 创建一个PodSandBox。
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
         //......
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
        //......
    }

    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)
    if err != nil {
        //......
    }

    return podSandBoxID, "", nil
}
  • generatePodSandboxConfig 获取PodSandbox的配置(如:metadata,clusterDNS,容器的端口映射等)

  • RunPodSandbox 创建并开启一个Pod级别的sandbox。

func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id string, err error) {
    // Step 1: Pull the image for the sandbox.
    image := defaultSandboxImage
    podSandboxImage := ds.podSandboxImage
    if len(podSandboxImage) != 0 {
        image = podSandboxImage
    }
    if err := ensureSandboxImageExists(ds.client, image); err != nil {
        return "", err
    }

    // Step 2: Create the sandbox container.
    createConfig, err := ds.makeSandboxDockerConfig(config, image)
    createResp, err := ds.client.CreateContainer(*createConfig)

    // Step 3: Create Sandbox Checkpoint.
    if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
        return createResp.ID, err
    }

    // Step 4: Start the sandbox container.
    err = ds.client.StartContainer(createResp.ID)

    // Step 5: Setup networking for the sandbox.
    cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
    err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)

    return createResp.ID
}
  • ensureSandboxImageExists 检测用户是否设置了自己的pause镜像,如果没有设置则使用默认的gcr.io/google_containers/pause-amd64:3.0镜像。
  • makeSandboxDockerConfig 生成创建pause容器的配置信息。
  • CreateContainer 创建容器
  • StartContainer 启动容器
  • network.SetUpPod 设置容器的网络(kubelet加载cni插件对容器的网络进行设置等)

上面的这些操作就把我们Pod中的第一个pause容器创建并启动了。之后要做的就是把该Pod中的其它业务容器逐一的启动。但是在启动真正的业务容器之前,首先会检查用户是否设置了init_container。如果设置了,则会按init_container设置的顺序依次的执行init_container(注意:当其中的init_container执行失败了,则Pod会异常,并且业务容器不会被创建)。当init_container执行完成之后,我们真正的业务容器才会被逐一的启动。

业务容器启动的逻辑和Pod的初始化pause容器的启动的流程基本一致。下面的代码是循环的启动业务容器。

for idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]

        //.....

        glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
            //.....
            continue
        }
}
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
    // Step 1: pull the image.
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)

    // Step 2: create the container.    
    containerConfig, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
    containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)

    // Step 3: start the container.
    err = m.runtimeService.StartContainer(containerID)

    // Step 4: execute the post start hook.
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
        kubeContainerID := kubecontainer.ContainerID{
            Type: m.runtimeName,
            ID:   containerID,
        }
        msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
        if handlerErr != nil {
            m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
            m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil)
            return "PostStart Hook Failed", err
        }
    }

    return "", nil
}
  • EnsureImageExists 检查业务镜像是否存在,不存在则到Docker Registry或是Private Registry拉取镜像。
  • generateContainerConfig 生成业务容器的配置信息
  • CreateContainer 通过client.CreateContainer调用docker engine-api创建业务容器。
  • StartContainer 启动业务容器
  • runner.Run 这个方法的主要作用就是在业务容器起来的时候,首先会执行一个container hook(PostStart和PreStop),做一些预处理工作。只有container kook执行成功才会运行具体的业务服务,否则容器异常。

这样Pod大体的启动流程就描述完了,但是对于kubelet中其它的中间服务,如: volumeManager,diskSpaceManager,secretManager,configMapManager等等就需要更深层的了解了。

本文链接:https://www.opsdev.cn/post/kubelet.html

-- EOF --

Comments

评论加载中...

注:如果长时间无法加载,请针对 disq.us | disquscdn.com | disqus.com 启用代理。