Vendor buildah after merging mtrmac/blob-info-caching-on-top-of-contents-caching

Signed-off-by: Miloslav Trmač <mitr@redhat.com>
This commit is contained in:
Miloslav Trmač 2018-12-06 16:06:44 +01:00
parent 79583c82ee
commit d9b5c29b3f
10 changed files with 764 additions and 55 deletions

View file

@ -92,7 +92,7 @@ k8s.io/kube-openapi 275e2ce91dec4c05a4094a7b1daee5560b555ac9 https://github.com/
k8s.io/utils 258e2a2fa64568210fbd6267cf1d8fd87c3cb86e https://github.com/kubernetes/utils
github.com/mrunalp/fileutils master
github.com/varlink/go master
github.com/containers/buildah 9c65e5699cfa486531b3f123d9ce74873f0e18aa
github.com/containers/buildah dd0f4f1b1eb49b841179049ac498e4b0f874b462
github.com/Nvveen/Gotty master
github.com/fsouza/go-dockerclient master
github.com/openshift/imagebuilder master

View file

@ -317,6 +317,10 @@ type BuilderOptions struct {
// the registry together, can not be resolved to a reference to a
// source image. No separator is implicitly added.
Transport string
// PullBlobDirectory is the name of a directory in which we'll attempt
// to store copies of layer blobs that we pull down, if any. It should
// already exist.
PullBlobDirectory string
// Mount signals to NewBuilder() that the container should be mounted
// immediately.
Mount bool

View file

@ -7,6 +7,7 @@ import (
"io/ioutil"
"time"
"github.com/containers/buildah/pkg/blobcache"
"github.com/containers/buildah/util"
cp "github.com/containers/image/copy"
"github.com/containers/image/docker/reference"
@ -55,6 +56,12 @@ type CommitOptions struct {
// Squash tells the builder to produce an image with a single layer
// instead of with possibly more than one layer.
Squash bool
// BlobDirectory is the name of a directory in which we'll look for
// prebuilt copies of layer blobs that we might otherwise need to
// regenerate from on-disk layers. If blobs are available, the
// manifest of the new image will reference the blobs rather than
// on-disk layers.
BlobDirectory string
// OnBuild is a list of commands to be run by images based on this image
OnBuild []string
@ -85,6 +92,11 @@ type PushOptions struct {
// ManifestType is the format to use when saving the imge using the 'dir' transport
// possible options are oci, v2s1, and v2s2
ManifestType string
// BlobDirectory is the name of a directory in which we'll look for
// prebuilt copies of layer blobs that we might otherwise need to
// regenerate from on-disk layers, substituting them in the list of
// blobs to copy whenever possible.
BlobDirectory string
}
// Commit writes the contents of the container, along with its updated
@ -128,13 +140,37 @@ func (b *Builder) Commit(ctx context.Context, dest types.ImageReference, options
}
}
}
src, err := b.makeImageRef(options.PreferredManifestType, options.Parent, exportBaseLayers, options.Squash, options.Compression, options.HistoryTimestamp)
src, err := b.makeImageRef(options.PreferredManifestType, options.Parent, exportBaseLayers, options.Squash, options.BlobDirectory, options.Compression, options.HistoryTimestamp)
if err != nil {
return imgID, nil, "", errors.Wrapf(err, "error computing layer digests and building metadata for container %q", b.ContainerID)
}
var maybeCachedSrc types.ImageReference = src
var maybeCachedDest types.ImageReference = dest
if options.BlobDirectory != "" {
compress := types.PreserveOriginal
if options.Compression != archive.Uncompressed {
compress = types.Compress
}
cache, err := blobcache.NewBlobCache(src, options.BlobDirectory, compress)
if err != nil {
return imgID, nil, "", errors.Wrapf(err, "error wrapping image reference %q in blob cache at %q", transports.ImageName(src), options.BlobDirectory)
}
maybeCachedSrc = cache
cache, err = blobcache.NewBlobCache(dest, options.BlobDirectory, compress)
if err != nil {
return imgID, nil, "", errors.Wrapf(err, "error wrapping image reference %q in blob cache at %q", transports.ImageName(dest), options.BlobDirectory)
}
maybeCachedDest = cache
}
// "Copy" our image to where it needs to be.
switch options.Compression {
case archive.Uncompressed:
systemContext.OCIAcceptUncompressedLayers = true
case archive.Gzip:
systemContext.DirForceCompress = true
}
var manifestBytes []byte
if manifestBytes, err = cp.Image(ctx, policyContext, dest, src, getCopyOptions(options.ReportWriter, src, nil, dest, systemContext, "")); err != nil {
if manifestBytes, err = cp.Image(ctx, policyContext, maybeCachedDest, maybeCachedSrc, getCopyOptions(options.ReportWriter, maybeCachedSrc, nil, maybeCachedDest, systemContext, "")); err != nil {
return imgID, nil, "", errors.Wrapf(err, "error copying layers and metadata for container %q", b.ContainerID)
}
if len(options.AdditionalTags) > 0 {
@ -209,10 +245,28 @@ func Push(ctx context.Context, image string, dest types.ImageReference, options
if err != nil {
return nil, "", err
}
var maybeCachedSrc types.ImageReference = src
if options.BlobDirectory != "" {
compress := types.PreserveOriginal
if options.Compression != archive.Uncompressed {
compress = types.Compress
}
cache, err := blobcache.NewBlobCache(src, options.BlobDirectory, compress)
if err != nil {
return nil, "", errors.Wrapf(err, "error wrapping image reference %q in blob cache at %q", transports.ImageName(src), options.BlobDirectory)
}
maybeCachedSrc = cache
}
// Copy everything.
switch options.Compression {
case archive.Uncompressed:
systemContext.OCIAcceptUncompressedLayers = true
case archive.Gzip:
systemContext.DirForceCompress = true
}
var manifestBytes []byte
if manifestBytes, err = cp.Image(ctx, policyContext, dest, src, getCopyOptions(options.ReportWriter, src, nil, dest, systemContext, options.ManifestType)); err != nil {
return nil, "", errors.Wrapf(err, "error copying layers and metadata from %q to %q", transports.ImageName(src), transports.ImageName(dest))
if manifestBytes, err = cp.Image(ctx, policyContext, dest, maybeCachedSrc, getCopyOptions(options.ReportWriter, maybeCachedSrc, nil, dest, systemContext, options.ManifestType)); err != nil {
return nil, "", errors.Wrapf(err, "error copying layers and metadata from %q to %q", transports.ImageName(maybeCachedSrc), transports.ImageName(dest))
}
if options.ReportWriter != nil {
fmt.Fprintf(options.ReportWriter, "")

View file

@ -57,22 +57,24 @@ type containerImageRef struct {
squash bool
tarPath func(path string) (io.ReadCloser, error)
parent string
blobDirectory string
}
type containerImageSource struct {
path string
ref *containerImageRef
store storage.Store
containerID string
mountLabel string
layerID string
names []string
compression archive.Compression
config []byte
configDigest digest.Digest
manifest []byte
manifestType string
exporting bool
path string
ref *containerImageRef
store storage.Store
containerID string
mountLabel string
layerID string
names []string
compression archive.Compression
config []byte
configDigest digest.Digest
manifest []byte
manifestType string
exporting bool
blobDirectory string
}
func (i *containerImageRef) NewImage(ctx context.Context, sc *types.SystemContext) (types.ImageCloser, error) {
@ -105,11 +107,11 @@ func expectedDockerDiffIDs(image docker.V2Image) int {
// Compute the media types which we need to attach to a layer, given the type of
// compression that we'll be applying.
func (i *containerImageRef) computeLayerMIMEType(what string) (omediaType, dmediaType string, err error) {
func computeLayerMIMEType(what string, layerCompression archive.Compression) (omediaType, dmediaType string, err error) {
omediaType = v1.MediaTypeImageLayer
dmediaType = docker.V2S2MediaTypeUncompressedLayer
if i.compression != archive.Uncompressed {
switch i.compression {
if layerCompression != archive.Uncompressed {
switch layerCompression {
case archive.Gzip:
omediaType = v1.MediaTypeImageLayerGzip
dmediaType = manifest.DockerV2Schema2LayerMediaType
@ -280,19 +282,21 @@ func (i *containerImageRef) NewImageSource(ctx context.Context, sc *types.System
// The default layer media type assumes no compression.
omediaType := v1.MediaTypeImageLayer
dmediaType := docker.V2S2MediaTypeUncompressedLayer
// Look up this layer.
layer, err := i.store.Layer(layerID)
if err != nil {
return nil, errors.Wrapf(err, "unable to locate layer %q", layerID)
}
// If we're not re-exporting the data, and we're reusing layers individually, reuse
// the blobsum and diff IDs.
if !i.exporting && !i.squash && layerID != i.layerID {
layer, err2 := i.store.Layer(layerID)
if err2 != nil {
return nil, errors.Wrapf(err, "unable to locate layer %q", layerID)
}
if layer.UncompressedDigest == "" {
return nil, errors.Errorf("unable to look up size of layer %q", layerID)
}
layerBlobSum := layer.UncompressedDigest
layerBlobSize := layer.UncompressedSize
// Note this layer in the manifest, using the uncompressed blobsum.
diffID := layer.UncompressedDigest
// Note this layer in the manifest, using the appropriate blobsum.
olayerDescriptor := v1.Descriptor{
MediaType: omediaType,
Digest: layerBlobSum,
@ -305,13 +309,13 @@ func (i *containerImageRef) NewImageSource(ctx context.Context, sc *types.System
Size: layerBlobSize,
}
dmanifest.Layers = append(dmanifest.Layers, dlayerDescriptor)
// Note this layer in the list of diffIDs, again using the uncompressed blobsum.
oimage.RootFS.DiffIDs = append(oimage.RootFS.DiffIDs, layerBlobSum)
dimage.RootFS.DiffIDs = append(dimage.RootFS.DiffIDs, layerBlobSum)
// Note this layer in the list of diffIDs, again using the uncompressed digest.
oimage.RootFS.DiffIDs = append(oimage.RootFS.DiffIDs, diffID)
dimage.RootFS.DiffIDs = append(dimage.RootFS.DiffIDs, diffID)
continue
}
// Figure out if we need to change the media type, in case we're using compression.
omediaType, dmediaType, err = i.computeLayerMIMEType(what)
// Figure out if we need to change the media type, in case we've changed the compression.
omediaType, dmediaType, err = computeLayerMIMEType(what, i.compression)
if err != nil {
return nil, err
}
@ -368,8 +372,9 @@ func (i *containerImageRef) NewImageSource(ctx context.Context, sc *types.System
}
logrus.Debugf("%s size is %d bytes", what, size)
// Rename the layer so that we can more easily find it by digest later.
if err = os.Rename(filepath.Join(path, "layer"), filepath.Join(path, destHasher.Digest().String())); err != nil {
return nil, errors.Wrapf(err, "error storing %s to file while renaming %q to %q", what, filepath.Join(path, "layer"), filepath.Join(path, destHasher.Digest().String()))
finalBlobName := filepath.Join(path, destHasher.Digest().String())
if err = os.Rename(filepath.Join(path, "layer"), finalBlobName); err != nil {
return nil, errors.Wrapf(err, "error storing %s to file while renaming %q to %q", what, filepath.Join(path, "layer"), finalBlobName)
}
// Add a note in the manifest about the layer. The blobs are identified by their possibly-
// compressed blob digests.
@ -472,19 +477,20 @@ func (i *containerImageRef) NewImageSource(ctx context.Context, sc *types.System
panic("unreachable code: unsupported manifest type")
}
src = &containerImageSource{
path: path,
ref: i,
store: i.store,
containerID: i.containerID,
mountLabel: i.mountLabel,
layerID: i.layerID,
names: i.names,
compression: i.compression,
config: config,
configDigest: digest.Canonical.FromBytes(config),
manifest: imageManifest,
manifestType: manifestType,
exporting: i.exporting,
path: path,
ref: i,
store: i.store,
containerID: i.containerID,
mountLabel: i.mountLabel,
layerID: i.layerID,
names: i.names,
compression: i.compression,
config: config,
configDigest: digest.Canonical.FromBytes(config),
manifest: imageManifest,
manifestType: manifestType,
exporting: i.exporting,
blobDirectory: i.blobDirectory,
}
return src, nil
}
@ -551,7 +557,7 @@ func (i *containerImageSource) LayerInfosForCopy(ctx context.Context) ([]types.B
return nil, nil
}
func (i *containerImageSource) GetBlob(ctx context.Context, blob types.BlobInfo) (reader io.ReadCloser, size int64, err error) {
func (i *containerImageSource) GetBlob(ctx context.Context, blob types.BlobInfo, cache types.BlobInfoCache) (reader io.ReadCloser, size int64, err error) {
if blob.Digest == i.configDigest {
logrus.Debugf("start reading config")
reader := bytes.NewReader(i.config)
@ -561,7 +567,16 @@ func (i *containerImageSource) GetBlob(ctx context.Context, blob types.BlobInfo)
}
return ioutils.NewReadCloserWrapper(reader, closer), reader.Size(), nil
}
layerFile, err := os.OpenFile(filepath.Join(i.path, blob.Digest.String()), os.O_RDONLY, 0600)
var layerFile *os.File
for _, path := range []string{i.blobDirectory, i.path} {
layerFile, err = os.OpenFile(filepath.Join(path, blob.Digest.String()), os.O_RDONLY, 0600)
if err == nil {
break
}
if !os.IsNotExist(err) {
logrus.Debugf("error checking for layer %q in %q: %v", blob.Digest.String(), path, err)
}
}
if err != nil {
logrus.Debugf("error reading layer %q: %v", blob.Digest.String(), err)
return nil, -1, errors.Wrapf(err, "error opening file %q to buffer layer blob", filepath.Join(i.path, blob.Digest.String()))
@ -584,7 +599,7 @@ func (i *containerImageSource) GetBlob(ctx context.Context, blob types.BlobInfo)
return ioutils.NewReadCloserWrapper(layerFile, closer), size, nil
}
func (b *Builder) makeImageRef(manifestType, parent string, exporting bool, squash bool, compress archive.Compression, historyTimestamp *time.Time) (types.ImageReference, error) {
func (b *Builder) makeImageRef(manifestType, parent string, exporting bool, squash bool, blobDirectory string, compress archive.Compression, historyTimestamp *time.Time) (types.ImageReference, error) {
var name reference.Named
container, err := b.store.Container(b.ContainerID)
if err != nil {
@ -630,6 +645,7 @@ func (b *Builder) makeImageRef(manifestType, parent string, exporting bool, squa
squash: squash,
tarPath: b.tarPath(),
parent: parent,
blobDirectory: blobDirectory,
}
return ref, nil
}

View file

@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
@ -168,6 +169,8 @@ type BuildOptions struct {
// ForceRmIntermediateCtrs tells the builder to remove all intermediate containers even if
// the build was unsuccessful.
ForceRmIntermediateCtrs bool
// BlobDirectory is a directory which we'll use for caching layer blobs.
BlobDirectory string
}
// Executor is a buildah-based implementation of the imagebuilder.Executor
@ -224,6 +227,7 @@ type Executor struct {
containerIDs []string // Stores the IDs of the successful intermediate containers used during layer build
imageMap map[string]string // Used to map images that we create to handle the AS construct.
copyFrom string // Used to keep track of the --from flag from COPY and ADD
blobDirectory string
}
// builtinAllowedBuildArgs is list of built-in allowed build args
@ -239,7 +243,7 @@ var builtinAllowedBuildArgs = map[string]bool{
}
// withName creates a new child executor that will be used whenever a COPY statement uses --from=NAME.
func (b *Executor) withName(name string, index int) *Executor {
func (b *Executor) withName(name string, index int, from string) *Executor {
if b.named == nil {
b.named = make(map[string]*Executor)
}
@ -248,6 +252,7 @@ func (b *Executor) withName(name string, index int) *Executor {
copied.name = name
child := &copied
b.named[name] = child
b.named[from] = child
if idx := strconv.Itoa(index); idx != name {
b.named[idx] = child
}
@ -609,6 +614,7 @@ func NewExecutor(store storage.Store, options BuildOptions) (*Executor, error) {
noCache: options.NoCache,
removeIntermediateCtrs: options.RemoveIntermediateCtrs,
forceRmIntermediateCtrs: options.ForceRmIntermediateCtrs,
blobDirectory: options.BlobDirectory,
}
if exec.err == nil {
exec.err = os.Stderr
@ -664,6 +670,7 @@ func (b *Executor) Prepare(ctx context.Context, stage imagebuilder.Stage, from s
PullPolicy: b.pullPolicy,
Registry: b.registry,
Transport: b.transport,
PullBlobDirectory: b.blobDirectory,
SignaturePolicyPath: b.signaturePolicyPath,
ReportWriter: b.reportWriter,
SystemContext: b.systemContext,
@ -1227,6 +1234,7 @@ func (b *Executor) Commit(ctx context.Context, ib *imagebuilder.Builder, created
SystemContext: b.systemContext,
IIDFile: b.iidfile,
Squash: b.squash,
BlobDirectory: b.blobDirectory,
Parent: b.builder.FromImageID,
}
imgID, ref, _, err := b.builder.Commit(ctx, imageRef, options)
@ -1252,8 +1260,16 @@ func (b *Executor) Build(ctx context.Context, stages imagebuilder.Stages) (strin
b.imageMap = make(map[string]string)
stageCount := 0
for _, stage := range stages {
stageExecutor = b.withName(stage.Name, stage.Position)
if err := stageExecutor.Prepare(ctx, stage, ""); err != nil {
ib := stage.Builder
node := stage.Node
base, err := ib.From(node)
if err != nil {
logrus.Debugf("Build(node.Children=%#v)", node.Children)
return "", nil, err
}
stageExecutor = b.withName(stage.Name, stage.Position, base)
if err := stageExecutor.Prepare(ctx, stage, base); err != nil {
return "", nil, err
}
// Always remove the intermediate/build containers, even if the build was unsuccessful.
@ -1392,6 +1408,9 @@ func BuildDockerfiles(ctx context.Context, store storage.Store, options BuildOpt
dockerfiles = append(dockerfiles, data)
}
dockerfiles = processCopyFrom(dockerfiles)
mainNode, err := imagebuilder.ParseDockerfile(dockerfiles[0])
if err != nil {
return "", nil, errors.Wrapf(err, "error parsing main Dockerfile")
@ -1415,6 +1434,80 @@ func BuildDockerfiles(ctx context.Context, store storage.Store, options BuildOpt
return exec.Build(ctx, stages)
}
// processCopyFrom goes through the Dockerfiles and handles any 'COPY --from' instances
// prepending a new FROM statement the Dockerfile that do not already have a corresponding
// FROM command within them.
func processCopyFrom(dockerfiles []io.ReadCloser) []io.ReadCloser {
var newDockerfiles []io.ReadCloser
// fromMap contains the names of the images seen in a FROM
// line in the Dockerfiles. The boolean value just completes the map object.
fromMap := make(map[string]bool)
// asMap contains the names of the images seen after a "FROM image AS"
// line in the Dockefiles. The boolean value just completes the map object.
asMap := make(map[string]bool)
copyRE := regexp.MustCompile(`\s*COPY\s+--from=`)
fromRE := regexp.MustCompile(`\s*FROM\s+`)
asRE := regexp.MustCompile(`(?i)\s+as\s+`)
for _, dfile := range dockerfiles {
if dfileBinary, err := ioutil.ReadAll(dfile); err == nil {
dfileString := fmt.Sprintf("%s", dfileBinary)
copyFromContent := copyRE.Split(dfileString, -1)
// no "COPY --from=", just continue
if len(copyFromContent) < 2 {
newDockerfiles = append(newDockerfiles, ioutil.NopCloser(strings.NewReader(dfileString)))
continue
}
// Load all image names in our Dockerfiles into a map
// for easy reference later.
fromContent := fromRE.Split(dfileString, -1)
for i := 0; i < len(fromContent); i++ {
imageName := strings.Split(fromContent[i], " ")
if len(imageName) > 0 {
finalImage := strings.Split(imageName[0], "\n")
if finalImage[0] != "" {
fromMap[strings.TrimSpace(finalImage[0])] = true
}
}
}
logrus.Debug("fromMap: ", fromMap)
// Load all image names associated with an 'as' or 'AS' in
// our Dockerfiles into a map for easy reference later.
asContent := asRE.Split(dfileString, -1)
// Skip the first entry in the array as it's stuff before
// the " as " and we don't care.
for i := 1; i < len(asContent); i++ {
asName := strings.Split(asContent[i], " ")
if len(asName) > 0 {
finalAsImage := strings.Split(asName[0], "\n")
if finalAsImage[0] != "" {
asMap[strings.TrimSpace(finalAsImage[0])] = true
}
}
}
logrus.Debug("asMap: ", asMap)
for i := 1; i < len(copyFromContent); i++ {
fromArray := strings.Split(copyFromContent[i], " ")
// If the image isn't a stage number or already declared,
// add a FROM statement for it to the top of our Dockerfile.
trimmedFrom := strings.TrimSpace(fromArray[0])
_, okFrom := fromMap[trimmedFrom]
_, okAs := asMap[trimmedFrom]
_, err := strconv.Atoi(trimmedFrom)
if !okFrom && !okAs && err != nil {
from := "FROM " + trimmedFrom
newDockerfiles = append(newDockerfiles, ioutil.NopCloser(strings.NewReader(from)))
}
}
newDockerfiles = append(newDockerfiles, ioutil.NopCloser(strings.NewReader(dfileString)))
} // End if dfileBinary, err := ioutil.ReadAll(dfile); err == nil
} // End for _, dfile := range dockerfiles {
return newDockerfiles
}
// deleteSuccessfulIntermediateCtrs goes through the container IDs in b.containerIDs
// and deletes the containers associated with that ID.
func (b *Executor) deleteSuccessfulIntermediateCtrs() error {

View file

@ -34,6 +34,7 @@ func pullAndFindImage(ctx context.Context, store storage.Store, imageName string
Store: store,
SystemContext: options.SystemContext,
Transport: options.Transport,
BlobDirectory: options.PullBlobDirectory,
}
ref, err := pullImage(ctx, store, imageName, pullOptions, sc)
if err != nil {

View file

@ -0,0 +1,517 @@
package blobcache
import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/containers/buildah/docker"
"github.com/containers/image/docker/reference"
"github.com/containers/image/image"
"github.com/containers/image/manifest"
"github.com/containers/image/transports"
"github.com/containers/image/types"
"github.com/containers/storage/pkg/archive"
"github.com/containers/storage/pkg/ioutils"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
_ types.ImageReference = &blobCacheReference{}
_ types.ImageSource = &blobCacheSource{}
_ types.ImageDestination = &blobCacheDestination{}
)
const (
compressedNote = ".compressed"
decompressedNote = ".decompressed"
)
// BlobCache is an object which saves copies of blobs that are written to it while passing them
// through to some real destination, and which can be queried directly in order to read them
// back.
type BlobCache interface {
types.ImageReference
// HasBlob checks if a blob that matches the passed-in digest (and
// size, if not -1), is present in the cache.
HasBlob(types.BlobInfo) (bool, int64, error)
// Directories returns the list of cache directories.
Directory() string
// ClearCache() clears the contents of the cache directories. Note
// that this also clears content which was not placed there by this
// cache implementation.
ClearCache() error
}
type blobCacheReference struct {
reference types.ImageReference
directory string
compress types.LayerCompression
}
type blobCacheSource struct {
reference *blobCacheReference
source types.ImageSource
sys types.SystemContext
cacheHits int64
cacheMisses int64
cacheErrors int64
}
type blobCacheDestination struct {
reference *blobCacheReference
destination types.ImageDestination
}
func makeFilename(blobSum digest.Digest, isConfig bool) string {
if isConfig {
return blobSum.String() + ".config"
}
return blobSum.String()
}
// NewBlobCache creates a new blob cache that wraps an image reference. Any blobs which are
// written to the destination image created from the resulting reference will also be stored
// as-is to the specifed directory or a temporary directory. The cache directory's contents
// can be cleared by calling the returned BlobCache()'s ClearCache() method.
// The compress argument controls whether or not the cache will try to substitute a compressed
// or different version of a blob when preparing the list of layers when reading an image.
func NewBlobCache(ref types.ImageReference, directory string, compress types.LayerCompression) (BlobCache, error) {
if directory == "" {
return nil, errors.Errorf("error creating cache around reference %q: no directory specified", transports.ImageName(ref))
}
switch compress {
case types.Compress, types.Decompress, types.PreserveOriginal:
// valid value, accept it
default:
return nil, errors.Errorf("unhandled LayerCompression value %v", compress)
}
return &blobCacheReference{
reference: ref,
directory: directory,
compress: compress,
}, nil
}
func (r *blobCacheReference) Transport() types.ImageTransport {
return r.reference.Transport()
}
func (r *blobCacheReference) StringWithinTransport() string {
return r.reference.StringWithinTransport()
}
func (r *blobCacheReference) DockerReference() reference.Named {
return r.reference.DockerReference()
}
func (r *blobCacheReference) PolicyConfigurationIdentity() string {
return r.reference.PolicyConfigurationIdentity()
}
func (r *blobCacheReference) PolicyConfigurationNamespaces() []string {
return r.reference.PolicyConfigurationNamespaces()
}
func (r *blobCacheReference) DeleteImage(ctx context.Context, sys *types.SystemContext) error {
return r.reference.DeleteImage(ctx, sys)
}
func (r *blobCacheReference) HasBlob(blobinfo types.BlobInfo) (bool, int64, error) {
if blobinfo.Digest == "" {
return false, -1, nil
}
for _, isConfig := range []bool{false, true} {
filename := filepath.Join(r.directory, makeFilename(blobinfo.Digest, isConfig))
fileInfo, err := os.Stat(filename)
if err == nil && (blobinfo.Size == -1 || blobinfo.Size == fileInfo.Size()) {
return true, fileInfo.Size(), nil
}
if !os.IsNotExist(err) {
return false, -1, errors.Wrapf(err, "error checking size of %q", filename)
}
}
return false, -1, nil
}
func (r *blobCacheReference) Directory() string {
return r.directory
}
func (r *blobCacheReference) ClearCache() error {
f, err := os.Open(r.directory)
if err != nil {
return errors.Wrapf(err, "error opening directory %q", r.directory)
}
defer f.Close()
names, err := f.Readdirnames(-1)
if err != nil {
return errors.Wrapf(err, "error reading directory %q", r.directory)
}
for _, name := range names {
pathname := filepath.Join(r.directory, name)
if err = os.RemoveAll(pathname); err != nil {
return errors.Wrapf(err, "error removing %q while clearing cache for %q", pathname, transports.ImageName(r))
}
}
return nil
}
func (r *blobCacheReference) NewImage(ctx context.Context, sys *types.SystemContext) (types.ImageCloser, error) {
src, err := r.NewImageSource(ctx, sys)
if err != nil {
return nil, errors.Wrapf(err, "error creating new image %q", transports.ImageName(r.reference))
}
return image.FromSource(ctx, sys, src)
}
func (r *blobCacheReference) NewImageSource(ctx context.Context, sys *types.SystemContext) (types.ImageSource, error) {
src, err := r.reference.NewImageSource(ctx, sys)
if err != nil {
return nil, errors.Wrapf(err, "error creating new image source %q", transports.ImageName(r.reference))
}
logrus.Debugf("starting to read from image %q using blob cache in %q (compression=%v)", transports.ImageName(r.reference), r.directory, r.compress)
return &blobCacheSource{reference: r, source: src, sys: *sys}, nil
}
func (r *blobCacheReference) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) {
dest, err := r.reference.NewImageDestination(ctx, sys)
if err != nil {
return nil, errors.Wrapf(err, "error creating new image destination %q", transports.ImageName(r.reference))
}
logrus.Debugf("starting to write to image %q using blob cache in %q", transports.ImageName(r.reference), r.directory)
return &blobCacheDestination{reference: r, destination: dest}, nil
}
func (s *blobCacheSource) Reference() types.ImageReference {
return s.reference
}
func (s *blobCacheSource) Close() error {
logrus.Debugf("finished reading from image %q using blob cache: cache had %d hits, %d misses, %d errors", transports.ImageName(s.reference), s.cacheHits, s.cacheMisses, s.cacheErrors)
return s.source.Close()
}
func (s *blobCacheSource) GetManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
if instanceDigest != nil {
filename := filepath.Join(s.reference.directory, makeFilename(*instanceDigest, false))
manifestBytes, err := ioutil.ReadFile(filename)
if err == nil {
s.cacheHits++
return manifestBytes, manifest.GuessMIMEType(manifestBytes), nil
}
if !os.IsNotExist(err) {
s.cacheErrors++
return nil, "", errors.Wrapf(err, "error checking for manifest file %q", filename)
}
}
s.cacheMisses++
return s.source.GetManifest(ctx, instanceDigest)
}
func (s *blobCacheSource) GetBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) {
present, size, err := s.reference.HasBlob(blobinfo)
if err != nil {
return nil, -1, err
}
if present {
for _, isConfig := range []bool{false, true} {
filename := filepath.Join(s.reference.directory, makeFilename(blobinfo.Digest, isConfig))
f, err := os.Open(filename)
if err == nil {
s.cacheHits++
return f, size, nil
}
if !os.IsNotExist(err) {
s.cacheErrors++
return nil, -1, errors.Wrapf(err, "error checking for cache file %q", filepath.Join(s.reference.directory, filename))
}
}
}
s.cacheMisses++
rc, size, err := s.source.GetBlob(ctx, blobinfo, cache)
if err != nil {
return rc, size, errors.Wrapf(err, "error reading blob from source image %q", transports.ImageName(s.reference))
}
return rc, size, nil
}
func (s *blobCacheSource) GetSignatures(ctx context.Context, instanceDigest *digest.Digest) ([][]byte, error) {
return s.source.GetSignatures(ctx, instanceDigest)
}
func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context) ([]types.BlobInfo, error) {
signatures, err := s.source.GetSignatures(ctx, nil)
if err != nil {
return nil, errors.Wrapf(err, "error checking if image %q has signatures", transports.ImageName(s.reference))
}
canReplaceBlobs := !(len(signatures) > 0 && len(signatures[0]) > 0)
infos, err := s.source.LayerInfosForCopy(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error getting layer infos for copying image %q through cache", transports.ImageName(s.reference))
}
if infos == nil {
image, err := s.reference.NewImage(ctx, &s.sys)
if err != nil {
return nil, errors.Wrapf(err, "error opening image to get layer infos for copying image %q through cache", transports.ImageName(s.reference))
}
defer image.Close()
infos = image.LayerInfos()
}
if canReplaceBlobs && s.reference.compress != types.PreserveOriginal {
replacedInfos := make([]types.BlobInfo, 0, len(infos))
for _, info := range infos {
var replaceDigest []byte
var err error
blobFile := filepath.Join(s.reference.directory, makeFilename(info.Digest, false))
alternate := ""
switch s.reference.compress {
case types.Compress:
alternate = blobFile + compressedNote
replaceDigest, err = ioutil.ReadFile(alternate)
case types.Decompress:
alternate = blobFile + decompressedNote
replaceDigest, err = ioutil.ReadFile(alternate)
}
if err == nil && digest.Digest(replaceDigest).Validate() == nil {
alternate = filepath.Join(filepath.Dir(alternate), makeFilename(digest.Digest(replaceDigest), false))
fileInfo, err := os.Stat(alternate)
if err == nil {
logrus.Debugf("suggesting cached blob with digest %q and compression %v in place of blob with digest %q", string(replaceDigest), s.reference.compress, info.Digest.String())
info.Digest = digest.Digest(replaceDigest)
info.Size = fileInfo.Size()
switch info.MediaType {
case v1.MediaTypeImageLayer, v1.MediaTypeImageLayerGzip:
switch s.reference.compress {
case types.Compress:
info.MediaType = v1.MediaTypeImageLayerGzip
case types.Decompress:
info.MediaType = v1.MediaTypeImageLayer
}
case docker.V2S2MediaTypeUncompressedLayer, manifest.DockerV2Schema2LayerMediaType:
switch s.reference.compress {
case types.Compress:
info.MediaType = manifest.DockerV2Schema2LayerMediaType
case types.Decompress:
info.MediaType = docker.V2S2MediaTypeUncompressedLayer
}
}
}
}
replacedInfos = append(replacedInfos, info)
}
infos = replacedInfos
}
return infos, nil
}
func (d *blobCacheDestination) Reference() types.ImageReference {
return d.reference
}
func (d *blobCacheDestination) Close() error {
logrus.Debugf("finished writing to image %q using blob cache", transports.ImageName(d.reference))
return d.destination.Close()
}
func (d *blobCacheDestination) SupportedManifestMIMETypes() []string {
return d.destination.SupportedManifestMIMETypes()
}
func (d *blobCacheDestination) SupportsSignatures(ctx context.Context) error {
return d.destination.SupportsSignatures(ctx)
}
func (d *blobCacheDestination) DesiredLayerCompression() types.LayerCompression {
return d.destination.DesiredLayerCompression()
}
func (d *blobCacheDestination) AcceptsForeignLayerURLs() bool {
return d.destination.AcceptsForeignLayerURLs()
}
func (d *blobCacheDestination) MustMatchRuntimeOS() bool {
return d.destination.MustMatchRuntimeOS()
}
func (d *blobCacheDestination) IgnoresEmbeddedDockerReference() bool {
return d.destination.IgnoresEmbeddedDockerReference()
}
// Decompress and save the contents of the decompressReader stream into the passed-in temporary
// file. If we successfully save all of the data, rename the file to match the digest of the data,
// and make notes about the relationship between the file that holds a copy of the compressed data
// and this new file.
func saveStream(wg *sync.WaitGroup, decompressReader io.ReadCloser, tempFile *os.File, compressedFilename string, compressedDigest digest.Digest, isConfig bool, alternateDigest *digest.Digest) {
defer wg.Done()
// Decompress from and digest the reading end of that pipe.
decompressed, err3 := archive.DecompressStream(decompressReader)
digester := digest.Canonical.Digester()
if err3 == nil {
// Read the decompressed data through the filter over the pipe, blocking until the
// writing end is closed.
_, err3 = io.Copy(io.MultiWriter(tempFile, digester.Hash()), decompressed)
} else {
// Drain the pipe to keep from stalling the PutBlob() thread.
io.Copy(ioutil.Discard, decompressReader)
}
decompressReader.Close()
decompressed.Close()
tempFile.Close()
// Determine the name that we should give to the uncompressed copy of the blob.
decompressedFilename := filepath.Join(filepath.Dir(tempFile.Name()), makeFilename(digester.Digest(), isConfig))
if err3 == nil {
// Rename the temporary file.
if err3 = os.Rename(tempFile.Name(), decompressedFilename); err3 != nil {
logrus.Debugf("error renaming new decompressed copy of blob %q into place at %q: %v", digester.Digest().String(), decompressedFilename, err3)
// Remove the temporary file.
if err3 = os.Remove(tempFile.Name()); err3 != nil {
logrus.Debugf("error cleaning up temporary file %q for decompressed copy of blob %q: %v", tempFile.Name(), compressedDigest.String(), err3)
}
} else {
*alternateDigest = digester.Digest()
// Note the relationship between the two files.
if err3 = ioutils.AtomicWriteFile(decompressedFilename+compressedNote, []byte(compressedDigest.String()), 0600); err3 != nil {
logrus.Debugf("error noting that the compressed version of %q is %q: %v", digester.Digest().String(), compressedDigest.String(), err3)
}
if err3 = ioutils.AtomicWriteFile(compressedFilename+decompressedNote, []byte(digester.Digest().String()), 0600); err3 != nil {
logrus.Debugf("error noting that the decompressed version of %q is %q: %v", compressedDigest.String(), digester.Digest().String(), err3)
}
}
} else {
// Remove the temporary file.
if err3 = os.Remove(tempFile.Name()); err3 != nil {
logrus.Debugf("error cleaning up temporary file %q for decompressed copy of blob %q: %v", tempFile.Name(), compressedDigest.String(), err3)
}
}
}
func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
var tempfile *os.File
var err error
var n int
var alternateDigest digest.Digest
wg := new(sync.WaitGroup)
defer wg.Wait()
compression := archive.Uncompressed
if inputInfo.Digest != "" {
filename := filepath.Join(d.reference.directory, makeFilename(inputInfo.Digest, isConfig))
tempfile, err = ioutil.TempFile(d.reference.directory, makeFilename(inputInfo.Digest, isConfig))
if err == nil {
stream = io.TeeReader(stream, tempfile)
defer func() {
if err == nil {
if err = os.Rename(tempfile.Name(), filename); err != nil {
if err2 := os.Remove(tempfile.Name()); err2 != nil {
logrus.Debugf("error cleaning up temporary file %q for blob %q: %v", tempfile.Name(), inputInfo.Digest.String(), err2)
}
err = errors.Wrapf(err, "error renaming new layer for blob %q into place at %q", inputInfo.Digest.String(), filename)
}
} else {
if err2 := os.Remove(tempfile.Name()); err2 != nil {
logrus.Debugf("error cleaning up temporary file %q for blob %q: %v", tempfile.Name(), inputInfo.Digest.String(), err2)
}
}
tempfile.Close()
}()
} else {
logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err)
}
if !isConfig {
initial := make([]byte, 8)
n, err = stream.Read(initial)
if n > 0 {
// Build a Reader that will still return the bytes that we just
// read, for PutBlob()'s sake.
stream = io.MultiReader(bytes.NewReader(initial[:n]), stream)
if n >= len(initial) {
compression = archive.DetectCompression(initial[:n])
}
if compression != archive.Uncompressed {
// The stream is compressed, so create a file which we'll
// use to store a decompressed copy.
decompressedTemp, err2 := ioutil.TempFile(d.reference.directory, makeFilename(inputInfo.Digest, isConfig))
if err2 != nil {
logrus.Debugf("error while creating a temporary file under %q to hold decompressed blob %q: %v", d.reference.directory, inputInfo.Digest.String(), err2)
decompressedTemp.Close()
} else {
// Write a copy of the compressed data to a pipe,
// closing the writing end of the pipe after
// PutBlob() returns.
decompressReader, decompressWriter := io.Pipe()
defer decompressWriter.Close()
stream = io.TeeReader(stream, decompressWriter)
// Let saveStream() close the reading end and handle the temporary file.
wg.Add(1)
go saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest)
}
}
}
}
}
newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig)
if err != nil {
return newBlobInfo, errors.Wrapf(err, "error storing blob to image destination for cache %q", transports.ImageName(d.reference))
}
if alternateDigest.Validate() == nil {
logrus.Debugf("added blob %q (also %q) to the cache at %q", inputInfo.Digest.String(), alternateDigest.String(), d.reference.directory)
} else {
logrus.Debugf("added blob %q to the cache at %q", inputInfo.Digest.String(), d.reference.directory)
}
return newBlobInfo, nil
}
func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute)
if err != nil || present {
return present, reusedInfo, err
}
for _, isConfig := range []bool{false, true} {
filename := filepath.Join(d.reference.directory, makeFilename(info.Digest, isConfig))
f, err := os.Open(filename)
if err == nil {
defer f.Close()
uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig)
if err != nil {
return false, types.BlobInfo{}, err
}
return true, uploadedInfo, nil
}
}
return false, types.BlobInfo{}, nil
}
func (d *blobCacheDestination) PutManifest(ctx context.Context, manifestBytes []byte) error {
manifestDigest, err := manifest.Digest(manifestBytes)
if err != nil {
logrus.Warnf("error digesting manifest %q: %v", string(manifestBytes), err)
} else {
filename := filepath.Join(d.reference.directory, makeFilename(manifestDigest, false))
if err = ioutils.AtomicWriteFile(filename, manifestBytes, 0600); err != nil {
logrus.Warnf("error saving manifest as %q: %v", filename, err)
}
}
return d.destination.PutManifest(ctx, manifestBytes)
}
func (d *blobCacheDestination) PutSignatures(ctx context.Context, signatures [][]byte) error {
return d.destination.PutSignatures(ctx, signatures)
}
func (d *blobCacheDestination) Commit(ctx context.Context) error {
return d.destination.Commit(ctx)
}

View file

@ -111,6 +111,10 @@ var (
Value: "",
Usage: "use `[username[:password]]` for accessing the registry",
},
cli.BoolFlag{
Name: "disable-compression, D",
Usage: "don't compress layers by default",
},
cli.BoolFlag{
Name: "disable-content-trust",
Usage: "This is a Docker specific option and is a NOOP",
@ -192,6 +196,12 @@ var (
Name: "add-host",
Usage: "add a custom host-to-IP mapping (`host:ip`) (default [])",
},
cli.StringFlag{
Name: "blob-cache",
Value: "",
Usage: "assume image blobs in the specified directory will be available for pushing",
Hidden: true, // this is here mainly so that we can test the API during integration tests
},
cli.StringSliceFlag{
Name: "cap-add",
Usage: "add the specified capability when running (default [])",

View file

@ -5,6 +5,7 @@ import (
"io"
"strings"
"github.com/containers/buildah/pkg/blobcache"
"github.com/containers/buildah/util"
cp "github.com/containers/image/copy"
"github.com/containers/image/docker/reference"
@ -40,6 +41,10 @@ type PullOptions struct {
// image name alone can not be resolved to a reference to a source
// image. No separator is implicitly added.
Transport string
// BlobDirectory is the name of a directory in which we'll attempt to
// store copies of layer blobs that we pull down, if any. It should
// already exist.
BlobDirectory string
}
func localImageNameForReference(ctx context.Context, store storage.Store, srcRef types.ImageReference, spec string) (string, error) {
@ -182,6 +187,14 @@ func pullImage(ctx context.Context, store storage.Store, imageName string, optio
if err != nil {
return nil, errors.Wrapf(err, "error parsing image name %q", destName)
}
var maybeCachedDestRef types.ImageReference = destRef
if options.BlobDirectory != "" {
cachedRef, err := blobcache.NewBlobCache(destRef, options.BlobDirectory, types.PreserveOriginal)
if err != nil {
return nil, errors.Wrapf(err, "error wrapping image reference %q in blob cache at %q", transports.ImageName(destRef), options.BlobDirectory)
}
maybeCachedDestRef = cachedRef
}
policy, err := signature.DefaultPolicy(sc)
if err != nil {
@ -200,7 +213,7 @@ func pullImage(ctx context.Context, store storage.Store, imageName string, optio
}()
logrus.Debugf("copying %q to %q", spec, destName)
if _, err := cp.Image(ctx, policyContext, destRef, srcRef, getCopyOptions(options.ReportWriter, srcRef, sc, destRef, nil, "")); err != nil {
if _, err := cp.Image(ctx, policyContext, maybeCachedDestRef, srcRef, getCopyOptions(options.ReportWriter, srcRef, sc, maybeCachedDestRef, nil, "")); err != nil {
logrus.Debugf("error copying src image [%q] to dest image [%q] err: %v", spec, destName, err)
return nil, err
}

View file

@ -3,9 +3,10 @@ github.com/blang/semver master
github.com/BurntSushi/toml master
github.com/containerd/continuity master
github.com/containernetworking/cni v0.7.0-alpha1
github.com/containers/image 63a1cbdc5e6537056695cf0d627c0a33b334df53
github.com/containers/image d53afe179b381fafb427e6b9cf9b1996a98c1067
github.com/boltdb/bolt master
github.com/containers/libpod fe4f09493f41f675d24c969d1b60d1a6a45ddb9e
github.com/containers/storage 3161726d1db0d0d4e86a9667dd476f09b997f497
github.com/containers/storage db40f96d853dfced60c563e61fb66ba231ce7c8d
github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716
github.com/docker/docker 86f080cff0914e9694068ed78d503701667c4c00
github.com/docker/docker-credential-helpers d68f9aeca33f5fd3f08eeae5e9d175edf4e731d1