Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support estargz compression type #2246

Merged
merged 1 commit into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip]`: choose compression type for layers newly created and cached, gzip is default value
* `compression=[uncompressed,gzip,estargz]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
Expand Down
62 changes: 43 additions & 19 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package cache
import (
"context"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -44,6 +46,8 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
return computeBlobChain(ctx, sr, createIfNeeded, compressionType, forceCompression, s)
}

type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error)

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
eg, ctx := errgroup.WithContext(ctx)
if sr.parent != nil {
Expand All @@ -53,20 +57,25 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
}

eg.Go(func() error {
v, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
if getBlob(sr.md) != "" {
return sr.ociDesc()
return nil, nil
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
}

var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch compressionType {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = writeEStargz()
mediaType = ocispecs.MediaTypeImageLayerGzip
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}
Expand Down Expand Up @@ -100,6 +109,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
desc, err := sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithCompressor(compressorFunc),
)
if err != nil {
return nil, err
Expand All @@ -108,6 +118,15 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return nil, errors.Wrapf(err, "failed to finalize compression")
}
for k, v := range a {
desc.Annotations[k] = v
}
}

info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
Expand All @@ -122,23 +141,18 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, desc); err != nil {
if err := sr.setBlob(ctx, compressionType, desc); err != nil {
return nil, err
}

return desc, nil
return nil, nil
})
if err != nil {
return err
}
descr, ok := v.(ocispecs.Descriptor)
if !ok {
return fmt.Errorf("invalid descriptor returned by differ while computing blob for %s", sr.ID())
}

if forceCompression {
if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil {
return err
if err := ensureCompression(ctx, sr, compressionType, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", compressionType)
}
}
return nil
Expand All @@ -153,7 +167,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
// setBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
// Caller should call Info() for knowing what current values are actually set
func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) error {
func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression.Type, desc ocispecs.Descriptor) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setBlob")
}
Expand All @@ -166,7 +180,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e
return err
}

compressionType := compression.FromMediaType(desc.MediaType)
if compressionType == compression.UnknownCompression {
return errors.Errorf("unhandled layer media type: %q", desc.MediaType)
}
Expand Down Expand Up @@ -197,7 +210,7 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e
return err
}

if err := sr.addCompressionBlob(ctx, desc.Digest, compressionType); err != nil {
if err := sr.addCompressionBlob(ctx, desc, compressionType); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -247,14 +260,25 @@ func isTypeWindows(sr *immutableRef) bool {
}

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispecs.Descriptor, compressionType compression.Type, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", desc.Digest, compressionType), func(ctx context.Context) (interface{}, error) {
func ensureCompression(ctx context.Context, ref *immutableRef, compressionType compression.Type, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), compressionType), func(ctx context.Context) (interface{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers)
if err != nil {
return nil, err
}

// Resolve converters
layerConvertFunc, _, err := getConverters(desc, compressionType)
layerConvertFunc, err := getConverter(desc, compressionType)
if err != nil {
return nil, err
} else if layerConvertFunc == nil {
return nil, nil // no need to convert
if isLazy, err := ref.isLazy(ctx); err != nil {
return nil, err
} else if isLazy {
// This ref can be used as the specified compressionType. Keep it lazy.
return nil, nil
}
return nil, ref.addCompressionBlob(ctx, desc, compressionType)
}

// First, lookup local content store
Expand All @@ -277,7 +301,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispecs.Des
}

// Start to track converted layer
if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil {
if err := ref.addCompressionBlob(ctx, *newDesc, compressionType); err != nil {
return nil, err
}
return nil, nil
Expand Down
179 changes: 101 additions & 78 deletions cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,86 +15,124 @@ import (
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

// getConverters returns converter functions according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverters(desc ocispecs.Descriptor, compressionType compression.Type) (converter.ConvertFunc, func(string) string, error) {
// needsConversion indicates whether a conversion is needed for the specified mediatype to
// be the compressionType.
func needsConversion(mediaType string, compressionType compression.Type) (bool, error) {
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(desc.MediaType) || uncompress.IsUncompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
if !images.IsLayerType(mediaType) || uncompress.IsUncompressedType(mediaType) {
return false, nil
}
return uncompress.LayerConvertFunc, convertMediaTypeToUncompress, nil
case compression.Gzip:
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
if !images.IsLayerType(mediaType) || isGzipCompressedType(mediaType) {
return false, nil
}
case compression.EStargz:
if !images.IsLayerType(mediaType) {
return false, nil
}
return gzipLayerConvertFunc, convertMediaTypeToGzip, nil
default:
return nil, nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
return false, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
return true, nil
}

func gzipLayerConvertFunc(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// getConverter returns converter function according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverter(desc ocispecs.Descriptor, compressionType compression.Type) (converter.ConvertFunc, error) {
if needs, err := needsConversion(desc.MediaType, compressionType); err != nil {
return nil, err
} else if !needs {
// No conversion. No need to return an error here.
return nil, nil
}

// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-gzip-from-%s", desc.Digest)
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
switch compressionType {
case compression.Uncompressed:
return uncompress.LayerConvertFunc, nil
case compression.Gzip:
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(w), nil }
return gzipLayerConvertFunc(compressionType, convertFunc, nil), nil
case compression.EStargz:
compressorFunc, finalize := writeEStargz()
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip) }
return gzipLayerConvertFunc(compressionType, convertFunc, finalize), nil
default:
return nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
zw := gzip.NewWriter(w)
defer zw.Close()
}

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}
func gzipLayerConvertFunc(compressionType compression.Type, convertFunc func(w io.Writer) (io.WriteCloser, error), finalize func(context.Context, content.Store) (map[string]string, error)) converter.ConvertFunc {
return func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, compressionType.String())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
zw, err := convertFunc(w)
if err != nil {
return nil, err
}
defer zw.Close()

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}

newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
return &newDesc, nil
newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(desc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
if finalize != nil {
a, err := finalize(ctx, cs)
if err != nil {
return nil, errors.Wrapf(err, "failed finalize compression")
}
for k, v := range a {
if newDesc.Annotations == nil {
newDesc.Annotations = make(map[string]string)
}
newDesc.Annotations[k] = v
}
}
return &newDesc, nil
}
}

func isGzipCompressedType(mt string) bool {
Expand All @@ -110,21 +148,6 @@ func isGzipCompressedType(mt string) bool {
}
}

func convertMediaTypeToUncompress(mt string) string {
switch mt {
case images.MediaTypeDockerSchema2LayerGzip:
return images.MediaTypeDockerSchema2Layer
case images.MediaTypeDockerSchema2LayerForeignGzip:
return images.MediaTypeDockerSchema2LayerForeign
case ocispecs.MediaTypeImageLayerGzip:
return ocispecs.MediaTypeImageLayer
case ocispecs.MediaTypeImageLayerNonDistributableGzip:
return ocispecs.MediaTypeImageLayerNonDistributable
default:
return mt
}
}

func convertMediaTypeToGzip(mt string) string {
if uncompress.IsUncompressedType(mt) {
if images.IsDockerType(mt) {
Expand Down
Loading