PodWorker 模块
概述
PodWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。
准备运行 Pod
podWorkers.UpdatePod() - 更新事件的 channel
updatePod 将配置更改或终止状态传递到 POD。 POD 可以是可变的,终止或终止,并且如果在 APIServer 上删除,则将转换为终止,它被发现具有终端阶段(成功或失败),或者如果它被 kubelet 驱逐。
为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。
源码:pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// 处理当 Pod 是孤儿(无配置)并且我们仅通过仅运行生命周期的终止部分来获得运行时状态
pod := options.Pod
var isRuntimePod bool
if options.RunningPod != nil {
if options.Pod == nil {
pod = options.RunningPod.ToAPIPod()
if options.UpdateType != kubetypes.SyncPodKill {
klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
options.Pod = pod
isRuntimePod = true
} else {
options.RunningPod = nil
klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KObj(options.Pod), "podUID", options.Pod.UID)
}
}
uid := pod.UID
p.podLock.Lock()
defer p.podLock.Unlock()
// decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
now := time.Now()
status, ok := p.podSyncStatuses[uid]
if !ok {
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.GetPodFullName(pod),
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
// check to see if the pod is not running and the pod is terminal.
// If this succeeds then record in the podWorker that it is terminated.
if statusCache, err := p.podCache.Get(pod.UID); err == nil {
if isPodStatusCacheTerminal(statusCache) {
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
fullname: kubecontainer.GetPodFullName(pod),
}
}
}
}
p.podSyncStatuses[uid] = status
}
// if an update is received that implies the pod should be running, but we are already terminating a pod by
// that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
// pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
// to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
// to start the pod worker again
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
// check for a transition to terminating
var becameTerminating bool
if !status.IsTerminationRequested() {
switch {
case isRuntimePod:
klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KObj(pod), "podUID", pod.UID)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.DeletionTimestamp != nil:
klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.terminatingAt = now
becameTerminating = true
case options.UpdateType == kubetypes.SyncPodKill:
if options.KillPodOptions != nil && options.KillPodOptions.Evict {
klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.evicted = true
} else {
klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.terminatingAt = now
becameTerminating = true
}
}
// once a pod is terminating, all updates are kills and the grace period can only decrease
var workType PodWorkType
var wasGracePeriodShortened bool
switch {
case status.IsTerminated():
// A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
// due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
// after the pod worker completes.
if isRuntimePod {
klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
workType = TerminatedPodWork
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
}
options.KillPodOptions = nil
case status.IsTerminationRequested():
workType = TerminatingPodWork
if options.KillPodOptions == nil {
options.KillPodOptions = &KillPodOptions{}
}
if ch := options.KillPodOptions.CompletedCh; ch != nil {
status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
}
if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
status.statusPostTerminating = append(status.statusPostTerminating, fn)
}
gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)
wasGracePeriodShortened = gracePeriodShortened
status.gracePeriod = gracePeriod
// always set the grace period for syncTerminatingPod so we don't have to recalculate,
// will never be zero.
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
default:
workType = SyncPodWork
// KillPodOptions is not valid for sync actions outside of the terminating phase
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
options.KillPodOptions = nil
}
}
// the desired work we want to be performing
work := podWork{
WorkType: workType,
Options: options,
}
// 如果 pod worker 协程不存在则启动它
podUpdates, exists := p.podUpdates[uid]
if !exists {
// 创建 channel
// 我们需要在这里有一个缓冲区,因为将更新放入通道的 checkForUpdates() 方法是从使用通道的同一个 goroutine 调用的。但是,可以保证在这种情况下通道是空的,因此大小为 1 的缓冲区就足够了。
podUpdates = make(chan podWork, 1)
p.podUpdates[uid] = podUpdates
// 确保静态 pod 按照 UpdatePod 接收它们的顺序启动
if kubetypes.IsStaticPod(pod) {
p.waitingToStartStaticPodsByFullname[status.fullname] =
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}
// 允许测试 pod 更新通道中的延迟
var outCh <-chan podWork
if p.workerChannelFn != nil {
outCh = p.workerChannelFn(uid, podUpdates)
} else {
outCh = podUpdates
}
// 启动 goroutine
// 创建一个新的 Pod Worker 意味着这是一个新的 POD,或者 kubelet 刚刚重新启动。
// 在任何一种情况下,Kubelet 都愿意相信第一个 POD Worker 同步的 POD 的状态。请参阅 Syncpod 中的相应评论。
go func() {
defer runtime.HandleCrash()
p.managePodLoop(outCh)
}()
}
// 如果没有运行,则向 pod worker 请求
if !status.IsWorking() {
status.working = true
podUpdates <- work
return
}
// 捕获请求的更新与pod worker观察到更新之间的最大延迟
if undelivered, ok := p.lastUndeliveredWorkUpdate[pod.UID]; ok {
// track the max latency between when a config change is requested and when it is realized
// NOTE: this undercounts the latency when multiple requests are queued, but captures max latency
if !undelivered.Options.StartTime.IsZero() && undelivered.Options.StartTime.Before(work.Options.StartTime) {
work.Options.StartTime = undelivered.Options.StartTime
}
}
// 始终同步最新数据
p.lastUndeliveredWorkUpdate[pod.UID] = work
if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", work.WorkType)
status.cancelFn()
return
}
}
podWorkers.managePodLoop() - 调用 podWorkers.syncPodFn() 方法同步 Pod
managePodLoop 调用 podWorkers.syncPodFn()
方法去同步 pod。在完成这次 sync 动作之后,会调用 wrapUp 函数,这个函数将会做几件事情:
- 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync
- 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。
源码:pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
for update := range podUpdates {
err := func() error {
// 采取适当的行动(UpdatePod阻止了非法阶段)
switch {
case update.WorkType == TerminatedPodWork:
case update.WorkType == TerminatingPodWork:
default:
// 这里的 podWorkers.syncPodFn() 实际上是 kubelet.SyncPod() 方法
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
}
}()
}
}
引用说明
这里的 p.syncPodFn()
引用的是 kubelet.SyncPod()
方法。来源如下:
- 源码:
pkg/kubelet/kubelet.go
func NewMainKubelet(......) (*Kubelet, error) {
klet := &Kubelet{
......
}
klet.podWorkers = newPodWorkers(klet.syncPod,......)
}
- 源码:
pkg/kubelet/pod_workers.go
func newPodWorkers(syncPodFn syncPodFnType,......) PodWorkers {
return &podWorkers{
syncPodFn: syncPodFn,
......
}
}
kubelet.syncPod() - 完成创建容器前的准备工作
在这个方法中,主要完成以下几件事情:
- 如果是删除 pod,立即执行并返回
- 同步 podStatus 到 kubelet.statusManager
- 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
- 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
- 如果是 static Pod,就创建或者更新对应的 mirrorPod
- 创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据
- 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。
- 调用 Runtime() 接口中的 SyncPod 方法,去实现真正的容器创建逻辑
这里所有的事情都和具体的容器没有关系,可以看到该方法是创建 pod 实体(即容器)之前需要完成的准备工作。
源码:pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
// 主要工作流的延迟测量是相对于 kubelet 第一次发现 Pod 的时间
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
// 如果创建,记录 Pod Worker 启动延迟
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// 这是我们第一次同步 pod。如果设置了 firstSeenTime,则记录自 kubelet 第一次看到 pod 以来的延迟。
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
} else {
klog.V(3).InfoS("First seen time not recorded for pod",
"podUID", pod.UID,
"pod", klog.KObj(pod))
}
}
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}
// 检查 Pod 是否可以运行在本节点。如果 Pod 不应该运行,将 Pod 的容器 stop,这与 termination 不同(我们希望 stop Pod,但如果软准入机制允许稍后重启它)
// 适当设置状态和阶段
runnable := kl.canRunPod(pod)
if !runnable.Admit {
}
// 如果设置了 firstSeenTime,记录自 kubelet 首次看到 Pod 以来 Pod 运行所需的时间。
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
}
// 更新 Pod 状态
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 必须停止不可运行的 Pod,并向 PodWorker 返回一个错误类型
if !runnable.Admit {
}
// 加载网络插件,如果网络插件没有准备好,只有在 Pod 使用宿主机的网络时才启动它
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
}
// 确保 kubelet 知道 Pod 使用的 secrets 和 configmaps 资源
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
}
// 为 Pod 创建 Cgroups,并在启用 cgroups-per-qos 标志的情况下对其应用资源参数。
pcm := kl.containerManager.NewPodContainerManager()
// 为静态 Pod 创建 Mirror Pod
if kubetypes.IsStaticPod(pod) {
}
// 为 Pod 创建数据目录
if err := kl.makePodDataDirs(pod); err != nil {
}
// 挂载 Volume
// Volume 管理器不会为 terminating 状态的 Pod 挂载卷
// TODO: 一旦添加上下文取消,可以删除此检查
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// 等待卷 attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
}
}
// 获取 Pod 的 secret 信息
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用 Runtime 接口中的 SyncPod() 方法以开始创建容器
// 这里的 kl.containerRuntime.SyncPod() 实际上是 kubeGenericRuntimeManager.SyncPod() 方法
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
}
引用说明
这里的 kl.containerRuntime.SyncPod()
引用的是 kubeGenericRuntimeManager.SyncPod()
方法,来源如下:
源码:pkg/kubelet/kuberuntime
-NewMainKubelet()
func NewMainKubelet() (*Kubelet, error) {
klet := &Kubelet{......}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(......)
klet.containerRuntime = runtime
}
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
}
开始运行 Pod(CRI 在这里)
KubeRuntimeManager(pkg/kubelet/kuberuntime) 子模块的 SyncPod() 方法是真正完成 Pod 内容器实体的创建。
kubeGenericRuntimeManager.runtimeService
就是 CRI,都是由第三方对接的,具体时间逻辑也在第三方,比如 Containerd,Docker 等。该结构体中的很多方法都调用了 runtimeService 接口中的方法,以控制 Pod 和 容器。
kubeGenericRuntimeManager.SyncPod() - 创建容器
syncPod 主要执行以下几个操作:
- 计算 Sandbox 和 Container 是否发生变化
- 必要时 kill 调 Pod Sandbox
- kill 调不应该运行的所有容器
- 必要时创建 Sandbox 容器
- 创建临时容器
- 创建初始化容器
- 创建业务容器
- 在创建容器中调用
kubeGenericRuntimeManager.startContainer()
启动容器
initContainers 可以有多个,多个 container 严格按照顺序启动,只有当前一个 container 退出了以后,才开始启动下一个 container。
源码:pkg/kubelet/kuberuntime/kuberuntime_manager.go
- containerRuntime.SyncPod()
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1:计算 sandbox 和 container 是否发生变化
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
}
}
// Step 2:kill 掉 sandbox 已经改变的 Pod
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {
klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
} else {
klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
}
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3:kill 掉非 running 状态的容器
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
return
}
}
}
// Step 4:如果必要,为 Pod 创建 sandbox
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
// ConvertPodSysctlsVariableToDotsSeparator converts sysctl variable
// in the Pod.Spec.SecurityContext.Sysctls slice into a dot as a separator.
// runc uses the dot as the separator to verify whether the sysctl variable
// is correct in a separate namespace, so when using the slash as the sysctl
// variable separator, runc returns an error: "sysctl is not in a separate kernel namespace"
// and the podSandBox cannot be successfully created. Therefore, before calling runc,
// we need to convert the sysctl variable, the dot is used as a separator to separate the kernel namespace.
// When runc supports slash as sysctl separator, this function can no longer be used.
sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
resp, err := m.runtimeService.PodSandboxStatus(podSandboxID, false)
// 如果 pod 网络是 host 模式,容器也相同;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置
if !kubecontainer.IsHostNetworkPod(pod) {
podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())
klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
}
}
// 为容器获取 Sandbox 配置(如:元数据、集群DNS 、容器的端口映射 等等)
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
// 用于启动容器的行为,适用于任何类型的容器,容器类型包括:container(容器)、init_container(初始化容器)、ephemeral_container(临时容器)
// 上述三种对容器的分类描述,在 日志消息 与 监控指标的标签 中会出现,用来定位容器。
// 下面代码中启动容器时,都会调用 start,也就是 `func(typeName, metricLabel string, spec *startSpec) error{}` 函数
// 启动容器的核心是 m.startContainer() 方法
start := func(typeName, metricLabel string, spec *startSpec) error {
// Step 最终:调用 m.startContainer() 启动容器
// 注意(Aramase)Podips填充单堆栈和双堆栈集群。只发送Podips。
m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs)
}
// Step 5:启动 ephemeral_container(临时容器),调用上面定义的 start。
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
// Step 6: 启动 init_container(初始化容器),调用上面定义的 start。
if container := podContainerChanges.NextInitContainerToStart; container != nil {
start("init container", metrics.InitContainer, containerStartSpec(container))
}
// Step 7:启动 container(容器)。调用上面定义的 start。
for _, idx := range podContainerChanges.ContainersToStart {
start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}
return
}
kubeGenericRuntimeManager.startContainer() - 启动容器
最终由 kubeGenericRuntimeManager.startContainer()
完成容器的启动,其主要有以下几个步骤:
- 拉取镜像
- 创建容器
- 启动容器
- 运行 post start lifecycle hooks
源码:pkg/kubelet/kuberuntime/kuberuntime_container.go
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
container := spec.container
// Step 1:拉取镜像
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return msg, err
}
// Step 1:创建容器
// 对于一个新的容器,RestartCount 变量的值应该为 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
target, err := spec.getTargetID(podStatus)
m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
m.internalLifecycle.PreStartContainer(pod, container, containerID)
// 3、启动容器
err = m.runtimeService.StartContainer(containerID)
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dangling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
"containerID", containerID, "containerLogPath", containerLog)
}
}
// 4、执行启动后的 Hook,就是一些启动后的检查,如果检查不通过,容器将会处于异常状态,并根据策略决定是否重启
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
// runner.Run 这个方法的主要作用就是在业务容器起来的时候,
// 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。
// 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
}
return msg, ErrPostStartHook
}
}
return "", nil
}
反馈
此页是否对你有帮助?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.