diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index c0e1afebb..3c115483b 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -133,8 +133,6 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { // Add/update buckets that are not registered with the DNS bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice() g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)).WithConcurrency(50) - ctx, cancel := g.WithCancelOnError(GlobalContext) - defer cancel() for index := range bucketsToBeUpdatedSlice { index := index @@ -143,9 +141,12 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { }, index) } - if err := g.WaitErr(); err != nil { - logger.LogIf(ctx, err) - return + ctx := GlobalContext + for _, err := range g.Wait() { + if err != nil { + logger.LogIf(ctx, err) + return + } } for _, bucket := range bucketsInConflict.ToSlice() { diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index cddcef755..27b01d83e 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -32,8 +32,6 @@ import ( func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) { g := errgroup.WithNErrs(len(objects)).WithConcurrency(500) - _, cancel := g.WithCancelOnError(ctx) - defer cancel() for index := range objects { index := index g.Go(func() error { @@ -45,7 +43,7 @@ func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) { return nil }, index) } - g.WaitErr() + g.Wait() } // Validate all the ListObjects query arguments, returns an APIErrorCode diff --git a/cmd/config.go b/cmd/config.go index 2929a9685..e55a48cfe 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "path" "sort" "strings" @@ -222,18 +223,18 @@ func initConfig(objAPI ObjectLayer) error { // If etcd is set then migrates /config/config.json // to '/.minio.sys/config/config.json' if err := migrateConfigToMinioSys(objAPI); err != nil { - return err + return fmt.Errorf("migrateConfigToMinioSys: %w", err) } // Migrates backend '/.minio.sys/config/config.json' to latest version. if err := migrateMinioSysConfig(objAPI); err != nil { - return err + return fmt.Errorf("migrateMinioSysConfig: %w", err) } // Migrates backend '/.minio.sys/config/config.json' to // latest config format. if err := migrateMinioSysConfigToKV(objAPI); err != nil { - return err + return fmt.Errorf("migrateMinioSysConfigToKV: %w", err) } return loadConfig(objAPI) diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go index 0b7f531d8..cbe48a8c6 100644 --- a/cmd/erasure-healing-common.go +++ b/cmd/erasure-healing-common.go @@ -30,7 +30,6 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat var maxima int // Counter for remembering max occurrence of elements. timeOccurenceMap := make(map[int64]int, len(modTimes)) - dataDirOccurenceMap := make(map[string]int, len(dataDirs)) // Ignore the uuid sentinel and count the rest. for _, time := range modTimes { if time.Equal(timeSentinel) { @@ -39,34 +38,21 @@ func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dat timeOccurenceMap[time.UnixNano()]++ } - for _, dataDir := range dataDirs { - if dataDir == errorDir { - continue - } - if dataDir == delMarkerDir { - dataDirOccurenceMap[delMarkerDir]++ - continue - } - dataDirOccurenceMap[dataDir]++ - } - // Find the common cardinality from previously collected // occurrences of elements. for nano, count := range timeOccurenceMap { - t := time.Unix(0, nano) + t := time.Unix(0, nano).UTC() if count > maxima || (count == maxima && t.After(modTime)) { maxima = count modTime = t } } - // Find the common cardinality from the previously collected - // occurrences of elements. - var dmaxima int - for ddataDir, count := range dataDirOccurenceMap { - if count > dmaxima { - dmaxima = count + for i, ddataDir := range dataDirs { + if modTimes[i].Equal(modTime) { + // Return the data-dir that matches modTime. dataDir = ddataDir + break } } @@ -136,14 +122,7 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) - dataDirs := make([]string, len(partsMetadata)) - for idx, fi := range partsMetadata { - if errs[idx] != nil { - dataDirs[idx] = errorDir - continue - } - dataDirs[idx] = fi.DataDir - } + dataDirs := getDataDirs(partsMetadata, errs) // Reduce list of UUIDs to a single common value. modTime, dataDir = commonTime(modTimes, dataDirs) @@ -160,23 +139,34 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) return onlineDisks, modTime, dataDir } +func getDataDirs(partsMetadata []FileInfo, errs []error) []string { + dataDirs := make([]string, len(partsMetadata)) + for idx, fi := range partsMetadata { + if errs[idx] != nil { + dataDirs[idx] = errorDir + continue + } + if fi.Deleted { + dataDirs[idx] = delMarkerDir + } else { + dataDirs[idx] = fi.DataDir + } + } + return dataDirs +} + // Returns the latest updated FileInfo files and error in case of failure. func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []error, quorum int) (FileInfo, error) { // There should be atleast half correct entries, if not return failure - if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum); reducedErr != nil { + reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum) + if reducedErr != nil { return FileInfo{}, reducedErr } // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) - dataDirs := make([]string, len(partsMetadata)) - for idx, fi := range partsMetadata { - if errs[idx] != nil { - continue - } - dataDirs[idx] = fi.DataDir - } + dataDirs := getDataDirs(partsMetadata, errs) // Count all latest updated FileInfo values var count int diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 5d9ed8a6a..38ece795d 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -42,20 +42,18 @@ func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts mad storageDisks := er.getDisks() storageEndpoints := er.getEndpoints() + // Heal bucket. + return er.healBucket(ctx, storageDisks, storageEndpoints, bucket, opts) +} + +// Heal bucket - create buckets on disks where it does not exist. +func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) { // get write quorum for an object writeQuorum := len(storageDisks) - er.defaultParityCount if writeQuorum == er.defaultParityCount { writeQuorum++ } - // Heal bucket. - return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, opts) -} - -// Heal bucket - create buckets on disks where it does not exist. -func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, writeQuorum int, - opts madmin.HealOpts) (res madmin.HealResultItem, err error) { - // Initialize sync waitgroup. g := errgroup.WithNErrs(len(storageDisks)) @@ -72,6 +70,14 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints afterState[index] = madmin.DriveStateOffline return errDiskNotFound } + + beforeState[index] = madmin.DriveStateOk + afterState[index] = madmin.DriveStateOk + + if bucket == minioReservedBucket { + return nil + } + if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil { if serr == errDiskNotFound { beforeState[index] = madmin.DriveStateOffline @@ -94,8 +100,6 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints return serr } - beforeState[index] = madmin.DriveStateOk - afterState[index] = madmin.DriveStateOk return nil }, index) } @@ -107,8 +111,8 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints Type: madmin.HealItemBucket, Bucket: bucket, DiskCount: len(storageDisks), - ParityBlocks: len(storageDisks) / 2, - DataBlocks: len(storageDisks) / 2, + ParityBlocks: er.defaultParityCount, + DataBlocks: len(storageDisks) - er.defaultParityCount, } for i := range beforeState { @@ -119,7 +123,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints }) } - reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1) + reducedErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, res.DataBlocks) if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate { for i := range beforeState { res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ @@ -153,7 +157,16 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum) if reducedErr != nil { - return res, reducedErr + // If we have exactly half the drives not available, + // we should still allow HealBucket to not return error. + // this is necessary for starting the server. + readQuorum := res.DataBlocks + switch reduceReadQuorumErrs(ctx, errs, nil, readQuorum) { + case nil: + case errDiskNotFound: + default: + return res, reducedErr + } } for i := range afterState { @@ -168,7 +181,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints // listAllBuckets lists all buckets from all disks. It also // returns the occurrence of each buckets in all disks -func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo) error { +func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error { g := errgroup.WithNErrs(len(storageDisks)) var mu sync.Mutex for index := range storageDisks { @@ -198,7 +211,7 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets return nil }, index) } - return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, len(storageDisks)/2) + return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum) } // Only heal on disks where we are sure that healing is needed. We can expand diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 650739bd7..359961188 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -913,21 +913,15 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o var mu sync.Mutex eg := errgroup.WithNErrs(len(objects)).WithConcurrency(10) - cctx, cancel := eg.WithCancelOnError(ctx) - defer cancel() for j, obj := range objects { j := j obj := obj eg.Go(func() error { - idx, err := z.getPoolIdxExistingNoLock(cctx, bucket, obj.ObjectName) - if isErrObjectNotFound(err) { + idx, err := z.getPoolIdxExistingNoLock(ctx, bucket, obj.ObjectName) + if err != nil { derrs[j] = err return nil } - if err != nil { - // unhandled errors return right here. - return err - } mu.Lock() poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj) origIndexMap[idx] = append(origIndexMap[idx], j) @@ -936,12 +930,7 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o }, j) } - if err := eg.WaitErr(); err != nil { - for i := range derrs { - derrs[i] = err - } - return dobjects, derrs - } + eg.Wait() // wait to check all the pools. // Delete concurrently in all server pools. var wg sync.WaitGroup @@ -1571,7 +1560,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts } // Attempt heal on the bucket metadata, ignore any failures - _, _ = z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket, bucketMetadataFile), "", opts) + defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket, bucketMetadataFile), "", opts) for _, pool := range z.serverPools { result, err := pool.HealBucket(ctx, bucket, opts) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 3aa70fc7f..638079982 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -26,6 +26,7 @@ import ( "hash/crc32" "math/rand" "net/http" + "reflect" "sort" "sync" "time" @@ -233,11 +234,20 @@ func (s *erasureSets) connectDisks() { s.erasureDisksMu.RUnlock() if err != nil { printEndpointError(endpoint, err, false) + disk.Close() return } s.erasureDisksMu.Lock() - if s.erasureDisks[setIndex][diskIndex] != nil { + if currentDisk := s.erasureDisks[setIndex][diskIndex]; currentDisk != nil { + if !reflect.DeepEqual(currentDisk.Endpoint(), disk.Endpoint()) { + err = fmt.Errorf("Detected unexpected disk ordering refusing to use the disk: expecting %s, found %s, refusing to use the disk", + currentDisk.Endpoint(), disk.Endpoint()) + printEndpointError(endpoint, err, false) + disk.Close() + s.erasureDisksMu.Unlock() + return + } s.erasureDisks[setIndex][diskIndex].Close() } if disk.IsLocal() { @@ -248,6 +258,7 @@ func (s *erasureSets) connectDisks() { disk, err = newStorageAPI(endpoint) if err != nil { printEndpointError(endpoint, err, false) + s.erasureDisksMu.Unlock() return } disk.SetDiskID(format.Erasure.This) @@ -398,7 +409,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto var lockerEpSet = set.NewStringSet() for j := 0; j < setDriveCount; j++ { endpoint := endpoints[i*setDriveCount+j] - // Only add lockers only one per endpoint and per erasure set. + // Only add lockers per endpoint. if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { lockerEpSet.Add(endpoint.Host) s.erasureLockers[i] = append(s.erasureLockers[i], locker) @@ -416,7 +427,9 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto continue } if m != i || n != j { - return nil, fmt.Errorf("found a disk in an unexpected location, pool: %d, found (set=%d, disk=%d) expected (set=%d, disk=%d): %s(%s)", poolIdx, m, n, i, j, disk, diskID) + logger.LogIf(GlobalContext, fmt.Errorf("Detected unexpected disk ordering refusing to use the disk - poolID: %s, found disk mounted at (set=%s, disk=%s) expected mount at (set=%s, disk=%s): %s(%s)", humanize.Ordinal(poolIdx+1), humanize.Ordinal(m+1), humanize.Ordinal(n+1), humanize.Ordinal(i+1), humanize.Ordinal(j+1), disk, diskID)) + s.erasureDisks[i][j] = &unrecognizedDisk{storage: disk} + continue } disk.SetDiskLoc(s.poolIndex, m, n) s.endpointStrings[m*setDriveCount+n] = disk.String() @@ -855,7 +868,7 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er var healBuckets = map[string]VolInfo{} for _, set := range s.sets { // lists all unique buckets across drives. - if err := listAllBuckets(ctx, set.getDisks(), healBuckets); err != nil { + if err := listAllBuckets(ctx, set.getDisks(), healBuckets, s.defaultParityCount); err != nil { return nil, err } } @@ -1326,8 +1339,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin } for _, set := range s.sets { - var healResult madmin.HealResultItem - healResult, err = set.HealBucket(ctx, bucket, opts) + healResult, err := set.HealBucket(ctx, bucket, opts) if err != nil { return result, toObjectErr(err, bucket) } @@ -1335,12 +1347,6 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin result.After.Drives = append(result.After.Drives, healResult.After.Drives...) } - // Check if we had quorum to write, if not return an appropriate error. - _, afterDriveOnline := result.GetOnlineCounts() - if afterDriveOnline < ((s.setCount*s.setDriveCount)/2)+1 { - return result, toObjectErr(errErasureWriteQuorum, bucket) - } - return result, nil } diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 0731e8681..baaf2bada 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -283,8 +283,6 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d // List until maxKeys requested. g := errgroup.WithNErrs(maxKeys).WithConcurrency(maxConcurrent) - ctx, cancel := g.WithCancelOnError(ctx) - defer cancel() objInfoFound := make([]*ObjectInfo, maxKeys) var i int @@ -345,8 +343,10 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d break } } - if err := g.WaitErr(); err != nil { - return loi, err + for _, err := range g.Wait() { + if err != nil { + return loi, err + } } // Copy found objects objInfos := make([]ObjectInfo, 0, i+1) diff --git a/cmd/server-main.go b/cmd/server-main.go index 8997e9d04..53ee9238c 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -335,7 +335,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { if err = handleEncryptedConfigBackend(newObject); err == nil { // Upon success migrating the config, initialize all sub-systems // if all sub-systems initialized successfully return right away - if err = initAllSubsystems(ctx, newObject); err == nil { + if err = initAllSubsystems(lkctx.Context(), newObject); err == nil { txnLk.Unlock(lkctx.Cancel) // All successful return. if globalIsDistErasure { @@ -384,17 +384,17 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { // Limit to no more than 50 concurrent buckets. g := errgroup.WithNErrs(len(buckets)).WithConcurrency(50) - ctx, cancel := g.WithCancelOnError(ctx) - defer cancel() for index := range buckets { index := index g.Go(func() error { - _, berr := newObject.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true}) - return berr + _, herr := newObject.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true}) + return herr }, index) } - if err := g.WaitErr(); err != nil { - return fmt.Errorf("Unable to list buckets to heal: %w", err) + for _, err := range g.Wait() { + if err != nil { + return fmt.Errorf("Unable to list buckets to heal: %w", err) + } } } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 5f0aa5327..2e6d4ba47 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -86,46 +86,163 @@ type StorageAPI interface { SetDiskLoc(poolIdx, setIdx, diskIdx int) // Set location indexes. } -// storageReader is an io.Reader view of a disk -type storageReader struct { - storage StorageAPI - volume, path string - offset int64 +type unrecognizedDisk struct { + storage StorageAPI } -func (r *storageReader) Read(p []byte) (n int, err error) { - nn, err := r.storage.ReadFile(context.TODO(), r.volume, r.path, r.offset, p, nil) - r.offset += nn - n = int(nn) +func (p *unrecognizedDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) { + return errDiskNotFound +} - if err == io.ErrUnexpectedEOF && nn > 0 { - err = io.EOF +func (p *unrecognizedDisk) String() string { + return p.storage.String() +} + +func (p *unrecognizedDisk) IsOnline() bool { + return false +} + +func (p *unrecognizedDisk) LastConn() time.Time { + return p.storage.LastConn() +} + +func (p *unrecognizedDisk) IsLocal() bool { + return p.storage.IsLocal() +} + +func (p *unrecognizedDisk) Endpoint() Endpoint { + return p.storage.Endpoint() +} + +func (p *unrecognizedDisk) Hostname() string { + return p.storage.Hostname() +} + +func (p *unrecognizedDisk) Healing() *healingTracker { + return nil +} + +func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) { + return dataUsageCache{}, errDiskNotFound +} + +func (p *unrecognizedDisk) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { + return -1, -1, -1 +} + +func (p *unrecognizedDisk) SetDiskLoc(poolIdx, setIdx, diskIdx int) { +} + +func (p *unrecognizedDisk) Close() error { + return p.storage.Close() +} + +func (p *unrecognizedDisk) GetDiskID() (string, error) { + return "", errDiskNotFound +} + +func (p *unrecognizedDisk) SetDiskID(id string) { +} + +func (p *unrecognizedDisk) DiskInfo(ctx context.Context) (info DiskInfo, err error) { + return info, errDiskNotFound +} + +func (p *unrecognizedDisk) MakeVolBulk(ctx context.Context, volumes ...string) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) MakeVol(ctx context.Context, volume string) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) ListVols(ctx context.Context) ([]VolInfo, error) { + return nil, errDiskNotFound +} + +func (p *unrecognizedDisk) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) { + return vol, errDiskNotFound +} + +func (p *unrecognizedDisk) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) { + return nil, errDiskNotFound +} + +func (p *unrecognizedDisk) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { + return 0, errDiskNotFound +} + +func (p *unrecognizedDisk) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { + return errDiskNotFound +} + +func (p *unrecognizedDisk) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { + return nil, errDiskNotFound +} + +func (p *unrecognizedDisk) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error { + return errDiskNotFound +} + +func (p *unrecognizedDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error { + return errDiskNotFound +} + +func (p *unrecognizedDisk) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) { + return errDiskNotFound +} + +// DeleteVersions deletes slice of versions, it can be same object +// or multiple objects. +func (p *unrecognizedDisk) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) (errs []error) { + errs = make([]error, len(versions)) + + for i := range errs { + errs[i] = errDiskNotFound } - return + return errs } -// storageWriter is a io.Writer view of a disk. -type storageWriter struct { - storage StorageAPI - volume, path string +func (p *unrecognizedDisk) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error { + return errDiskNotFound } -func (w *storageWriter) Write(p []byte) (n int, err error) { - err = w.storage.AppendFile(context.TODO(), w.volume, w.path, p) - if err == nil { - n = len(p) - } - return +func (p *unrecognizedDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { + return errDiskNotFound } -// StorageWriter returns a new io.Writer which appends data to the file -// at the given disk, volume and path. -func StorageWriter(storage StorageAPI, volume, path string) io.Writer { - return &storageWriter{storage, volume, path} +func (p *unrecognizedDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { + return errDiskNotFound } -// StorageReader returns a new io.Reader which reads data to the file -// at the given disk, volume, path and offset. -func StorageReader(storage StorageAPI, volume, path string, offset int64) io.Reader { - return &storageReader{storage, volume, path, offset} +func (p *unrecognizedDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + return errDiskNotFound +} + +func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { + return fi, errDiskNotFound +} + +func (p *unrecognizedDisk) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { + return nil, errDiskNotFound +} + +func (p *unrecognizedDisk) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { + return nil, errDiskNotFound }