diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 68e27c31a..6971ffa9b 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -325,6 +325,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs if err != nil { return nil, fmt.Errorf("transition storage class not configured") } + fn, off, length, err := NewGetObjectReader(rs, oi, opts) if err != nil { return nil, ErrorRespToObjectError(err, bucket, object) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 9923f3198..c3dfd1d48 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -936,16 +936,16 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang objInfo.Size = rngInfo.Size rs = nil } - var nsUnlocker = func() {} + // For a directory, we need to send an reader that returns no bytes. if HasSuffix(object, SlashSeparator) { // The lock taken above is released when // objReader.Close() is called by the caller. - gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker) + gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts) return gr, numHits, gerr } - fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker) + fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts) if nErr != nil { return nil, numHits, nErr } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a480bbcf1..e449369a4 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -186,25 +186,30 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri if objInfo.TransitionStatus == lifecycle.TransitionComplete { // If transitioned, stream from transition tier unless object is restored locally or restore date is past. if onDisk := isRestoredObjectOnDisk(objInfo.UserDefined); !onDisk { - return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts) + gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts) + if err != nil { + return nil, err + } + unlockOnDefer = false + return gr.WithCleanupFuncs(nsUnlocker), nil } } - unlockOnDefer = false - fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker) - if nErr != nil { - return nil, nErr + + fn, off, length, err := NewGetObjectReader(rs, objInfo, opts) + if err != nil { + return nil, err } + unlockOnDefer = false pr, pw := io.Pipe() go func() { - err := er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks) - pw.CloseWithError(err) + pw.CloseWithError(er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks)) }() // Cleanup function to cause the go routine above to exit, in // case of incomplete read. pipeCloser := func() { pr.Close() } - return fn(pr, h, opts.CheckPrecondFn, pipeCloser) + return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker) } // GetObject - reads an object erasured coded across multiple diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 60e632ecf..282ce92cf 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -748,8 +748,10 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rwPoolUnlocker = func() { fs.rwPool.Close(fsMetaPath) } } - objReaderFn, off, length, err := NewGetObjectReader(rs, objInfo, opts, nsUnlocker, rwPoolUnlocker) + objReaderFn, off, length, err := NewGetObjectReader(rs, objInfo, opts) if err != nil { + rwPoolUnlocker() + nsUnlocker() return nil, err } @@ -777,7 +779,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, return nil, err } - return objReaderFn(reader, h, opts.CheckPrecondFn, closeFn) + return objReaderFn(reader, h, opts.CheckPrecondFn, closeFn, rwPoolUnlocker, nsUnlocker) } // getObject - wrapper for GetObject diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 9ab2b4e15..2545863f5 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -554,14 +554,20 @@ func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset // GetObjectReader is a type that wraps a reader with a lock to // provide a ReadCloser interface that unlocks on Close() type GetObjectReader struct { - ObjInfo ObjectInfo - pReader io.Reader - + io.Reader + ObjInfo ObjectInfo cleanUpFns []func() opts ObjectOptions once sync.Once } +// WithCleanupFuncs sets additional cleanup functions to be called when closing +// the GetObjectReader. +func (g *GetObjectReader) WithCleanupFuncs(fns ...func()) *GetObjectReader { + g.cleanUpFns = append(g.cleanUpFns, fns...) + return g +} + // NewGetObjectReaderFromReader sets up a GetObjectReader with a given // reader. This ignores any object properties. func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions, cleanupFns ...func()) (*GetObjectReader, error) { @@ -574,7 +580,7 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions } return &GetObjectReader{ ObjInfo: oi, - pReader: r, + Reader: r, cleanUpFns: cleanupFns, opts: opts, }, nil @@ -587,26 +593,16 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions type ObjReaderFn func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cleanupFns ...func()) (r *GetObjectReader, err error) // NewGetObjectReader creates a new GetObjectReader. The cleanUpFns -// are called on Close() in reverse order as passed here. NOTE: It is +// are called on Close() in FIFO order as passed in ObjReadFn(). NOTE: It is // assumed that clean up functions do not panic (otherwise, they may // not all run!). -func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cleanUpFns ...func()) ( +func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( fn ObjReaderFn, off, length int64, err error) { if rs == nil && opts.PartNumber > 0 { rs = partNumberToRangeSpec(oi, opts.PartNumber) } - // Call the clean-up functions immediately in case of exit - // with error - defer func() { - if err != nil { - for i := len(cleanUpFns) - 1; i >= 0; i-- { - cleanUpFns[i]() - } - } - }() - _, isEncrypted := crypto.IsEncrypted(oi.UserDefined) isCompressed, err := oi.IsCompressedOK() if err != nil { @@ -658,11 +654,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl } } fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) { - cFns = append(cleanUpFns, cFns...) if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) { - // Call the cleanup funcs - for i := len(cFns) - 1; i >= 0; i-- { - cFns[i]() + for _, cFn := range cFns { + cFn() } return nil, PreConditionFailed{} } @@ -672,8 +666,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, firstPart, oi, copySource) if err != nil { // Call the cleanup funcs - for i := len(cFns) - 1; i >= 0; i-- { - cFns[i]() + for _, cFn := range cFns { + cFn() } return nil, err } @@ -685,8 +679,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl if decOff > 0 { if err = s2Reader.Skip(decOff); err != nil { // Call the cleanup funcs - for i := len(cFns) - 1; i >= 0; i-- { - cFns[i]() + for _, cFn := range cFns { + cFn() } return nil, err } @@ -697,9 +691,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize) if err == nil { decReader = rah - cFns = append(cFns, func() { + cFns = append([]func(){func() { rah.Close() - }) + }}, cFns...) } } oi.Size = decLength @@ -707,7 +701,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl // Assemble the GetObjectReader r = &GetObjectReader{ ObjInfo: oi, - pReader: decReader, + Reader: decReader, cleanUpFns: cFns, opts: opts, } @@ -741,7 +735,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) { copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != "" - cFns = append(cleanUpFns, cFns...) // Attach decrypter on inputReader var decReader io.Reader decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource) @@ -770,7 +763,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl // Assemble the GetObjectReader r = &GetObjectReader{ ObjInfo: oi, - pReader: decReader, + Reader: decReader, cleanUpFns: cFns, opts: opts, } @@ -783,7 +776,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl return nil, 0, 0, err } fn = func(inputReader io.Reader, _ http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) { - cFns = append(cleanUpFns, cFns...) if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) { // Call the cleanup funcs for i := len(cFns) - 1; i >= 0; i-- { @@ -793,7 +785,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl } r = &GetObjectReader{ ObjInfo: oi, - pReader: inputReader, + Reader: inputReader, cleanUpFns: cFns, opts: opts, } @@ -815,11 +807,6 @@ func (g *GetObjectReader) Close() error { return nil } -// Read - to implement Reader interface. -func (g *GetObjectReader) Read(p []byte) (n int, err error) { - return g.pReader.Read(p) -} - //SealMD5CurrFn seals md5sum with object encryption key and returns sealed // md5sum type SealMD5CurrFn func([]byte) []byte