add deadlines for readMetadata() in listing (#17776)

Bonus: also skip spending time looking for xl.json

- Listing()
- Delete()
This commit is contained in:
Harshavardhana 2023-08-01 21:52:31 -07:00 committed by GitHub
parent a7a7533190
commit 0153f96a20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 22 deletions

View file

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -79,6 +79,10 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return err
}
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
// Use a small block size to start sending quickly
w := newMetacacheWriter(wr, 16<<10)
w.reuseBlocks = true // We are not sharing results, so reuse buffers.
@ -114,6 +118,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
metadata, err := s.readMetadata(ctx, pathJoin(volumeDir,
opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix,
xlStorageFormatFile))
diskHealthCheckOK(ctx, err)
if err == nil {
// if baseDir is already a directory object, consider it
// as part of the list call, this is AWS S3 specific
@ -245,7 +250,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return nil
}
// Check legacy.
if HasSuffix(entry, xlStorageFormatFileV1) {
if HasSuffix(entry, xlStorageFormatFileV1) && legacy {
var meta metaCacheEntry
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, current, entry))
diskHealthCheckOK(ctx, err)
@ -331,14 +336,16 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return err
}
case osIsNotExist(err), isSysErrIsDir(err):
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1))
diskHealthCheckOK(ctx, err)
if err == nil {
// It was an object
if err := send(meta); err != nil {
return err
if legacy {
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1))
diskHealthCheckOK(ctx, err)
if err == nil {
// It was an object
if err := send(meta); err != nil {
return err
}
continue
}
continue
}
// NOT an object, append to stack (with slash)

View file

@ -441,7 +441,12 @@ func (s *xlStorage) readMetadataWithDMTime(ctx context.Context, itemPath string)
}
func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) {
buf, _, err := s.readMetadataWithDMTime(ctx, itemPath)
var buf []byte
err := xioutil.NewDeadlineWorker(diskMaxTimeout).Run(func() error {
var rerr error
buf, _, rerr = s.readMetadataWithDMTime(ctx, itemPath)
return rerr
})
return buf, err
}
@ -957,16 +962,21 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
var legacyJSON bool
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil {
if err != errFileNotFound {
if !errors.Is(err, errFileNotFound) {
return err
}
metaDataPoolPut(buf) // Never used, return it
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
if err != nil {
return err
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
if legacy {
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
if err != nil {
return err
}
legacyJSON = true
}
legacyJSON = true
}
if len(buf) == 0 {
@ -1099,7 +1109,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
var legacyJSON bool
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil {
if err != errFileNotFound {
if !errors.Is(err, errFileNotFound) {
return err
}
metaDataPoolPut(buf) // Never used, return it
@ -1108,14 +1118,19 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return s.WriteMetadata(ctx, volume, path, fi)
}
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
if err != nil {
if err == errFileNotFound && fi.VersionID != "" {
return errFileVersionNotFound
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
if legacy {
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
if err != nil {
if errors.Is(err, errFileNotFound) && fi.VersionID != "" {
return errFileVersionNotFound
}
return err
}
return err
legacyJSON = true
}
legacyJSON = true
}
if len(buf) == 0 {