non-empty dirs should not be listed as objects (#9129)

This commit is contained in:
Krishna Srinivas 2020-03-13 17:43:00 -07:00 committed by GitHub
parent 6b92f3fd99
commit 2e9fed1a14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 45 additions and 22 deletions

View file

@ -1058,15 +1058,18 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er
// is a leaf or non-leaf entry.
func (fs *FSObjects) listDirFactory() ListDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string) {
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) {
var err error
entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir))
if err != nil && err != errFileNotFound {
logger.LogIf(context.Background(), err)
return
return false, nil
}
if len(entries) == 0 {
return true, nil
}
sort.Strings(entries)
return filterMatchingPrefix(entries, prefixEntry)
return false, filterMatchingPrefix(entries, prefixEntry)
}
// Return list factory instance.

View file

@ -326,7 +326,7 @@ func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketIn
func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string) {
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) {
f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, prefixDir))
if err != nil {
if os.IsNotExist(err) {
@ -341,6 +341,9 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
logger.LogIf(context.Background(), err)
return
}
if len(fis) == 0 {
return true, nil
}
for _, fi := range fis {
if fi.IsDir() {
entries = append(entries, fi.Name()+hdfsSeparator)
@ -348,7 +351,7 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
entries = append(entries, fi.Name())
}
}
return minio.FilterMatchingPrefix(entries, prefixEntry)
return false, minio.FilterMatchingPrefix(entries, prefixEntry)
}
// Return list factory instance.

View file

@ -45,6 +45,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
// Will not store any objects in this bucket,
// Its to test ListObjects on an empty bucket.
"empty-bucket",
// Listing the case where the marker > last object.
"test-bucket-single-object",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, "")
@ -72,6 +74,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[1], "obj1", "obj1", nil},
{testBuckets[1], "obj2", "obj2", nil},
{testBuckets[1], "temporary/0/", "", nil},
{testBuckets[3], "A/B", "contentstring", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
@ -445,6 +448,11 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{Name: "temporary/0/"},
},
},
// ListObjectsResult-34 Listing with marker > last object should return empty
{
IsTruncated: false,
Objects: []ObjectInfo{},
},
}
testCases := []struct {
@ -559,6 +567,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{"test-bucket-empty-dir", "", "", SlashSeparator, 10, resultCases[32], nil, true},
// Test listing a directory which contains an empty directory (64)
{"test-bucket-empty-dir", "", "temporary/", "", 10, resultCases[33], nil, true},
// Test listing with marker > last object such that response should be empty (65)
{"test-bucket-single-object", "", "A/C", "", 1000, resultCases[34], nil, true},
}
for i, testCase := range testCases {

View file

@ -669,13 +669,16 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
ch = make(chan FileInfo)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (entries []string) {
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
entries, err := s.ListDir(volume, dirPath, -1, leafFile)
if err != nil {
return
}
if len(entries) == 0 {
return true, nil
}
sort.Strings(entries)
return filterMatchingPrefix(entries, dirEntry)
return false, filterMatchingPrefix(entries, dirEntry)
}
walkResultCh := startTreeWalk(context.Background(), volume, dirPath, marker, recursive, listDir, endWalkCh)

View file

@ -56,10 +56,10 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string {
}
// ListDirFunc - "listDir" function of type listDirFunc returned by listDirFactory() - explained below.
type ListDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string)
type ListDirFunc func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string)
// treeWalk walks directory tree recursively pushing TreeWalkResult into the channel as and when it encounters files.
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh <-chan struct{}, isEnd bool) (totalNum int, treeErr error) {
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh <-chan struct{}, isEnd bool) (emptyDir bool, treeErr error) {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -75,10 +75,10 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
}
}
entries := listDir(bucket, prefixDir, entryPrefixMatch)
emptyDir, entries := listDir(bucket, prefixDir, entryPrefixMatch)
// For an empty list return right here.
if len(entries) == 0 {
return 0, nil
if emptyDir {
return true, nil
}
// example:
@ -90,7 +90,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
entries = entries[idx:]
// For an empty list after search through the entries, return right here.
if len(entries) == 0 {
return 0, nil
return false, nil
}
for i, entry := range entries {
@ -123,16 +123,16 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
// true at the end of the treeWalk stream.
markIsEnd := i == len(entries)-1 && isEnd
totalFound, err := doTreeWalk(ctx, bucket, pentry, prefixMatch, markerArg, recursive,
emptyDir, err := doTreeWalk(ctx, bucket, pentry, prefixMatch, markerArg, recursive,
listDir, resultCh, endWalkCh, markIsEnd)
if err != nil {
return 0, err
return false, err
}
// A nil totalFound means this is an empty directory that
// needs to be sent to the result channel, otherwise continue
// to the next entry.
if totalFound > 0 {
if !emptyDir {
continue
}
}
@ -141,13 +141,13 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
isEOF := ((i == len(entries)-1) && isEnd)
select {
case <-endWalkCh:
return 0, errWalkAbort
return false, errWalkAbort
case resultCh <- TreeWalkResult{entry: pentry, end: isEOF}:
}
}
// Everything is listed.
return len(entries), nil
return false, nil
}
// Initiate a new treeWalk in a goroutine.

View file

@ -260,7 +260,7 @@ func TestListDir(t *testing.T) {
}
// Should list "file1" from fsDir1.
entries := listDir(volume, "", "")
_, entries := listDir(volume, "", "")
if len(entries) != 2 {
t.Fatal("Expected the number of entries to be 2")
}
@ -278,7 +278,7 @@ func TestListDir(t *testing.T) {
}
// Should list "file2" from fsDir2.
entries = listDir(volume, "", "")
_, entries = listDir(volume, "", "")
if len(entries) != 1 {
t.Fatal("Expected the number of entries to be 1")
}

View file

@ -25,7 +25,7 @@ import (
// disks - used for doing disk.ListDir()
func listDirFactory(ctx context.Context, disks ...StorageAPI) ListDirFunc {
// Returns sorted merged entries from all the disks.
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string) {
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, mergedEntries []string) {
for _, disk := range disks {
if disk == nil {
continue
@ -38,6 +38,10 @@ func listDirFactory(ctx context.Context, disks ...StorageAPI) ListDirFunc {
continue
}
if len(entries) == 0 {
return true, nil
}
// Find elements in entries which are not in mergedEntries
for _, entry := range entries {
idx := sort.SearchStrings(mergedEntries, entry)
@ -54,7 +58,7 @@ func listDirFactory(ctx context.Context, disks ...StorageAPI) ListDirFunc {
sort.Strings(mergedEntries)
}
}
return filterMatchingPrefix(mergedEntries, prefixEntry)
return false, filterMatchingPrefix(mergedEntries, prefixEntry)
}
return listDir
}