kata-containers / tests

Kata Containers tests, CI, and metrics
https://katacontainers.io/
Apache License 2.0
139 stars 196 forks source link

CC | the test "create two pods with pulling the image only once" only can be tested with specific containerd #5772

Open ChengyuZhu6 opened 1 year ago

ChengyuZhu6 commented 1 year ago

To determine if the image has been pulled once or multiple times, currently our only option is to inspect the containerd log, as advised by

pull_times=$(journalctl -t containerd -g "PullImage \"$image_unsigned_protected\" with snapshotter nydus" | wc -l)
        [ ${pull_times} -eq 1 ]

But the log is only supported on containerd 1.7.x or above.

containerd 1.7.x :

# pkg/cri/server/image_pull.go
func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (_ *runtime.PullImageResponse, err error) {
    span := tracing.SpanFromContext(ctx)
    defer func() {
        // TODO: add domain label for imagePulls metrics, and we may need to provide a mechanism
        // for the user to configure the set of registries that they are interested in.
        if err != nil {
            imagePulls.WithValues("failure").Inc()
        } else {
            imagePulls.WithValues("success").Inc()
        }
    }()

    inProgressImagePulls.Inc()
    defer inProgressImagePulls.Dec()
    startTime := time.Now()

    imageRef := r.GetImage().GetImage()
    namedRef, err := distribution.ParseDockerRef(imageRef)
    if err != nil {
        return nil, fmt.Errorf("failed to parse image reference %q: %w", imageRef, err)
    }
    ref := namedRef.String()
    if ref != imageRef {
        log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref)
    }

    imagePullProgressTimeout, err := time.ParseDuration(c.config.ImagePullProgressTimeout)
    if err != nil {
        return nil, fmt.Errorf("failed to parse image_pull_progress_timeout %q: %w", c.config.ImagePullProgressTimeout, err)
    }

    var (
        pctx, pcancel = context.WithCancel(ctx)

        pullReporter = newPullProgressReporter(ref, pcancel, imagePullProgressTimeout)

        resolver = docker.NewResolver(docker.ResolverOptions{
            Headers: c.config.Registry.Headers,
            Hosts:   c.registryHosts(ctx, r.GetAuth(), pullReporter.optionUpdateClient),
        })
        isSchema1    bool
        imageHandler containerdimages.HandlerFunc = func(_ context.Context,
            desc imagespec.Descriptor) ([]imagespec.Descriptor, error) {
            if desc.MediaType == containerdimages.MediaTypeDockerSchema1Manifest {
                isSchema1 = true
            }
            return nil, nil
        }
    )

    defer pcancel()
    snapshotter, err := c.snapshotterFromPodSandboxConfig(ctx, ref, r.SandboxConfig)
    if err != nil {
        return nil, err
    }
    log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
    span.SetAttributes(
        tracing.Attribute("image.ref", ref),
        tracing.Attribute("snapshotter.name", snapshotter),
    )

    labels := c.getLabels(ctx, ref)

    pullOpts := []containerd.RemoteOpt{
        containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
        containerd.WithResolver(resolver),
        containerd.WithPullSnapshotter(snapshotter),
        containerd.WithPullUnpack,
        containerd.WithPullLabels(labels),
        containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
        containerd.WithImageHandler(imageHandler),
        containerd.WithUnpackOpts([]containerd.UnpackOpt{
            containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
        }),
    }

    pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)
    if !c.config.ContainerdConfig.DisableSnapshotAnnotations {
        pullOpts = append(pullOpts,
            containerd.WithImageHandlerWrapper(snpkg.AppendInfoHandlerWrapper(ref)))
    }

    if c.config.ContainerdConfig.DiscardUnpackedLayers {
        // Allows GC to clean layers up from the content store after unpacking
        pullOpts = append(pullOpts,
            containerd.WithChildLabelMap(containerdimages.ChildGCLabelsFilterLayers))
    }

    pullReporter.start(pctx)
    image, err := c.client.Pull(pctx, ref, pullOpts...)
    pcancel()
    if err != nil {
        return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
    }
    span.AddEvent("Pull and unpack image complete")

    configDesc, err := image.Config(ctx)
    if err != nil {
        return nil, fmt.Errorf("get image config descriptor: %w", err)
    }
    imageID := configDesc.Digest.String()

    repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1)
    for _, r := range []string{imageID, repoTag, repoDigest} {
        if r == "" {
            continue
        }
        if err := c.createImageReference(ctx, r, image.Target(), labels); err != nil {
            return nil, fmt.Errorf("failed to create image reference %q: %w", r, err)
        }
        // Update image store to reflect the newest state in containerd.
        // No need to use `updateImage`, because the image reference must
        // have been managed by the cri plugin.
        if err := c.imageStore.Update(ctx, r); err != nil {
            return nil, fmt.Errorf("failed to update image store %q: %w", r, err)
        }
    }

    size, _ := image.Size(ctx)
    imagePullingSpeed := float64(size) / time.Since(startTime).Seconds()
    imagePullThroughput.Observe(imagePullingSpeed)

    log.G(ctx).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q, size %q in %s", imageRef, imageID,
        repoTag, repoDigest, strconv.FormatInt(size, 10), time.Since(startTime))
    // NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
    // in-memory image store, it's only for in-memory indexing. The image could be removed
    // by someone else anytime, before/during/after we create the metadata. We should always
    // check the actual state in containerd before using the image or returning status of the
    // image.
    return &runtime.PullImageResponse{ImageRef: imageID}, nil
}

containerd 1.6.x:

func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (*runtime.PullImageResponse, error) {
    imageRef := r.GetImage().GetImage()
    namedRef, err := distribution.ParseDockerRef(imageRef)
    if err != nil {
        return nil, fmt.Errorf("failed to parse image reference %q: %w", imageRef, err)
    }
    ref := namedRef.String()
    if ref != imageRef {
        log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref)
    }
    var (
        resolver = docker.NewResolver(docker.ResolverOptions{
            Headers: c.config.Registry.Headers,
            Hosts:   c.registryHosts(ctx, r.GetAuth()),
        })
        isSchema1    bool
        imageHandler containerdimages.HandlerFunc = func(_ context.Context,
            desc imagespec.Descriptor) ([]imagespec.Descriptor, error) {
            if desc.MediaType == containerdimages.MediaTypeDockerSchema1Manifest {
                isSchema1 = true
            }
            return nil, nil
        }
    )

    labels := c.getLabels(ctx, ref)

    pullOpts := []containerd.RemoteOpt{
        containerd.WithSchema1Conversion,
        containerd.WithResolver(resolver),
        containerd.WithPullSnapshotter(c.config.ContainerdConfig.Snapshotter),
        containerd.WithPullUnpack,
        containerd.WithPullLabels(labels),
        containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
        containerd.WithImageHandler(imageHandler),
        containerd.WithUnpackOpts([]containerd.UnpackOpt{
            containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
        }),
    }

    pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)
    if !c.config.ContainerdConfig.DisableSnapshotAnnotations {
        pullOpts = append(pullOpts,
            containerd.WithImageHandlerWrapper(snpkg.AppendInfoHandlerWrapper(ref)))
    }

    if c.config.ContainerdConfig.DiscardUnpackedLayers {
        // Allows GC to clean layers up from the content store after unpacking
        pullOpts = append(pullOpts,
            containerd.WithChildLabelMap(containerdimages.ChildGCLabelsFilterLayers))
    }

    image, err := c.client.Pull(ctx, ref, pullOpts...)
    if err != nil {
        return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
    }

    configDesc, err := image.Config(ctx)
    if err != nil {
        return nil, fmt.Errorf("get image config descriptor: %w", err)
    }
    imageID := configDesc.Digest.String()

    repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1)
    for _, r := range []string{imageID, repoTag, repoDigest} {
        if r == "" {
            continue
        }
        if err := c.createImageReference(ctx, r, image.Target(), labels); err != nil {
            return nil, fmt.Errorf("failed to create image reference %q: %w", r, err)
        }
        // Update image store to reflect the newest state in containerd.
        // No need to use `updateImage`, because the image reference must
        // have been managed by the cri plugin.
        if err := c.imageStore.Update(ctx, r); err != nil {
            return nil, fmt.Errorf("failed to update image store %q: %w", r, err)
        }
    }

    log.G(ctx).Debugf("Pulled image %q with image id %q, repo tag %q, repo digest %q", imageRef, imageID,
        repoTag, repoDigest)
    // NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
    // in-memory image store, it's only for in-memory indexing. The image could be removed
    // by someone else anytime, before/during/after we create the metadata. We should always
    // check the actual state in containerd before using the image or returning status of the
    // image.
    return &runtime.PullImageResponse{ImageRef: imageID}, nil
}