minio/cmd/erasure-object_test.go
Harshavardhana 069432566f update license change for MinIO
Signed-off-by: Harshavardhana <harsha@minio.io>
2021-04-23 11:58:53 -07:00

762 lines
24 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"io"
"io/ioutil"
"os"
"strconv"
"testing"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/config/storageclass"
)
func TestRepeatPutObjectPart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var objLayer ObjectLayer
var disks []string
var err error
var opts ObjectOptions
objLayer, disks, err = prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
// cleaning up of temporary test directories
defer objLayer.Shutdown(context.Background())
defer removeRoots(disks)
err = objLayer.MakeBucketWithLocation(ctx, "bucket1", BucketOptions{})
if err != nil {
t.Fatal(err)
}
uploadID, err := objLayer.NewMultipartUpload(ctx, "bucket1", "mpartObj1", opts)
if err != nil {
t.Fatal(err)
}
fiveMBBytes := bytes.Repeat([]byte("a"), 5*humanize.MiByte)
md5Hex := getMD5Hash(fiveMBBytes)
_, err = objLayer.PutObjectPart(ctx, "bucket1", "mpartObj1", uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(fiveMBBytes), 5*humanize.MiByte, md5Hex, ""), opts)
if err != nil {
t.Fatal(err)
}
// PutObjectPart should succeed even if part already exists. ref: https://github.com/minio/minio/issues/1930
_, err = objLayer.PutObjectPart(ctx, "bucket1", "mpartObj1", uploadID, 1, mustGetPutObjReader(t, bytes.NewReader(fiveMBBytes), 5*humanize.MiByte, md5Hex, ""), opts)
if err != nil {
t.Fatal(err)
}
}
func TestErasureDeleteObjectBasic(t *testing.T) {
testCases := []struct {
bucket string
object string
expectedErr error
}{
{".test", "dir/obj", BucketNameInvalid{Bucket: ".test"}},
{"----", "dir/obj", BucketNameInvalid{Bucket: "----"}},
{"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}},
{"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}},
{"bucket", "dir/doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "dir/doesnotexist"}},
{"bucket", "dir", ObjectNotFound{Bucket: "bucket", Object: "dir"}},
{"bucket", "dir/", ObjectNotFound{Bucket: "bucket", Object: "dir/"}},
{"bucket", "dir/obj", nil},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend
xl, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
defer xl.Shutdown(context.Background())
err = xl.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
if err != nil {
t.Fatal(err)
}
// Create object "dir/obj" under bucket "bucket" for Test 7 to pass
_, err = xl.PutObject(ctx, "bucket", "dir/obj", mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{})
if err != nil {
t.Fatalf("Erasure Object upload failed: <ERROR> %s", err)
}
for _, test := range testCases {
test := test
t.Run("", func(t *testing.T) {
_, err := xl.GetObjectInfo(ctx, "bucket", "dir/obj", ObjectOptions{})
if err != nil {
t.Fatal("dir/obj not found before last test")
}
_, actualErr := xl.DeleteObject(ctx, test.bucket, test.object, ObjectOptions{})
if test.expectedErr != nil && actualErr != test.expectedErr {
t.Errorf("Expected to fail with %s, but failed with %s", test.expectedErr, actualErr)
}
if test.expectedErr == nil && actualErr != nil {
t.Errorf("Expected to pass, but failed with %s", actualErr)
}
})
}
// Cleanup backend directories
removeRoots(fsDirs)
}
func TestErasureDeleteObjectsErasureSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var objs []*erasureObjects
for i := 0; i < 32; i++ {
obj, fsDirs, err := prepareErasure(ctx, 16)
if err != nil {
t.Fatal("Unable to initialize 'Erasure' object layer.", err)
}
// Remove all dirs.
for _, dir := range fsDirs {
defer os.RemoveAll(dir)
}
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
objs = append(objs, xl)
}
erasureSets := &erasureSets{sets: objs, distributionAlgo: "CRCMOD"}
type testCaseType struct {
bucket string
object string
}
bucketName := "bucket"
testCases := []testCaseType{
{bucketName, "dir/obj1"},
{bucketName, "dir/obj2"},
{bucketName, "obj3"},
{bucketName, "obj_4"},
}
err := erasureSets.MakeBucketWithLocation(ctx, bucketName, BucketOptions{})
if err != nil {
t.Fatal(err)
}
for _, testCase := range testCases {
_, err = erasureSets.PutObject(ctx, testCase.bucket, testCase.object,
mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{})
if err != nil {
t.Fatalf("Erasure Object upload failed: <ERROR> %s", err)
}
}
toObjectNames := func(testCases []testCaseType) []ObjectToDelete {
names := make([]ObjectToDelete, len(testCases))
for i := range testCases {
names[i] = ObjectToDelete{ObjectName: testCases[i].object}
}
return names
}
objectNames := toObjectNames(testCases)
_, delErrs := erasureSets.DeleteObjects(ctx, bucketName, objectNames, ObjectOptions{})
for i := range delErrs {
if delErrs[i] != nil {
t.Errorf("Failed to remove object `%v` with the error: `%v`", objectNames[i], delErrs[i])
}
}
for _, test := range testCases {
_, statErr := erasureSets.GetObjectInfo(ctx, test.bucket, test.object, ObjectOptions{})
switch statErr.(type) {
case ObjectNotFound:
default:
t.Fatalf("Object %s is not removed", test.bucket+SlashSeparator+test.object)
}
}
}
func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
restoreGlobalStorageClass := globalStorageClass
defer func() {
globalStorageClass = restoreGlobalStorageClass
}()
globalStorageClass = storageclass.Config{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend.
obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
// Create "bucket"
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
opts := ObjectOptions{}
// Create object "obj" under bucket "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
if err != nil {
t.Fatal(err)
}
// for a 16 disk setup, quorum is 9. To simulate disks not found yet
// quorum is available, we remove disks leaving quorum disks behind.
erasureDisks := xl.getDisks()
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
for i := range erasureDisks[:4] {
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
}
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
if err != nil {
t.Fatal(err)
}
// Create "obj" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
if err != nil {
t.Fatal(err)
}
// Remove one more disk to 'lose' quorum, by setting it to nil.
erasureDisks = xl.getDisks()
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
erasureDisks[7] = nil
erasureDisks[8] = nil
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
// since majority of disks are not available, metaquorum is not achieved and hence errErasureWriteQuorum error
if !errors.Is(err, errErasureWriteQuorum) {
t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}
}
func TestGetObjectNoQuorum(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend.
obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories.
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
// Create "bucket"
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
opts := ObjectOptions{}
buf := make([]byte, smallFileThreshold*16)
if _, err = io.ReadFull(crand.Reader, buf); err != nil {
t.Fatal(err)
}
// Test use case 1: All disks are online, xl.meta are present, but data are missing
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts)
if err != nil {
t.Fatal(err)
}
for _, disk := range xl.getDisks() {
files, _ := disk.ListDir(ctx, bucket, object, -1)
for _, file := range files {
if file != "xl.meta" {
disk.Delete(ctx, bucket, pathJoin(object, file), true)
}
}
}
err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts)
if err != toObjectErr(errFileNotFound, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
// in a 16 disk Erasure setup. The original disks are 'replaced' with
// naughtyDisks that fail after 'f' successful StorageAPI method
// invocations, where f - [0,2)
// Create "object" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts)
if err != nil {
t.Fatal(err)
}
for f := 0; f < 2; f++ {
diskErrors := make(map[int]error)
for i := 0; i <= f; i++ {
diskErrors[i] = nil
}
erasureDisks := xl.getDisks()
for i := range erasureDisks[:9] {
switch diskType := erasureDisks[i].(type) {
case *naughtyDisk:
erasureDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk)
default:
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
}
}
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()
// Fetch object from store.
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}
}
}
func TestHeadObjectNoQuorum(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend.
obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories.
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
// Create "bucket"
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
opts := ObjectOptions{}
// Test use case 1: All disks are online, xl.meta are present, but data are missing
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
if err != nil {
t.Fatal(err)
}
for _, disk := range xl.getDisks() {
files, _ := disk.ListDir(ctx, bucket, object, -1)
for _, file := range files {
if file != "xl.meta" {
disk.Delete(ctx, bucket, pathJoin(object, file), true)
}
}
}
_, err = xl.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
t.Errorf("Expected StatObject to succeed if data dir are not found, but failed with %v", err)
}
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
// in a 16 disk Erasure setup. The original disks are 'replaced' with
// naughtyDisks that fail after 'f' successful StorageAPI method
// invocations, where f - [0,2)
// Create "object" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
if err != nil {
t.Fatal(err)
}
erasureDisks := xl.getDisks()
for i := range erasureDisks[:10] {
erasureDisks[i] = nil
}
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()
// Fetch object from store.
_, err = xl.GetObjectInfo(ctx, bucket, object, opts)
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected getObjectInfo to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}
}
func TestPutObjectNoQuorum(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend.
obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories.
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
// Create "bucket"
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
opts := ObjectOptions{}
// Create "object" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold*16)), smallFileThreshold*16, "", ""), opts)
if err != nil {
t.Fatal(err)
}
// Make 9 disks offline, which leaves less than quorum number of disks
// in a 16 disk Erasure setup. The original disks are 'replaced' with
// naughtyDisks that fail after 'f' successful StorageAPI method
// invocations, where f - [0,4)
for f := 0; f < 2; f++ {
diskErrors := make(map[int]error)
for i := 0; i <= f; i++ {
diskErrors[i] = nil
}
erasureDisks := xl.getDisks()
for i := range erasureDisks[:9] {
switch diskType := erasureDisks[i].(type) {
case *naughtyDisk:
erasureDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk)
default:
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
}
}
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()
// Upload new content to same object "object"
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold*16)), smallFileThreshold*16, "", ""), opts)
if !errors.Is(err, errErasureWriteQuorum) {
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}
}
}
func TestPutObjectNoQuorumSmall(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create an instance of xl backend.
obj, fsDirs, err := prepareErasure16(ctx)
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories.
defer obj.Shutdown(context.Background())
defer removeRoots(fsDirs)
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
// Create "bucket"
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
opts := ObjectOptions{}
// Create "object" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold/2)), smallFileThreshold/2, "", ""), opts)
if err != nil {
t.Fatal(err)
}
// Make 9 disks offline, which leaves less than quorum number of disks
// in a 16 disk Erasure setup. The original disks are 'replaced' with
// naughtyDisks that fail after 'f' successful StorageAPI method
// invocations, where f - [0,2)
for f := 0; f < 2; f++ {
t.Run("exec-"+strconv.Itoa(f), func(t *testing.T) {
diskErrors := make(map[int]error)
for i := 0; i <= f; i++ {
diskErrors[i] = nil
}
erasureDisks := xl.getDisks()
for i := range erasureDisks[:9] {
switch diskType := erasureDisks[i].(type) {
case *naughtyDisk:
erasureDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk)
default:
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
}
}
z.serverPools[0].erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI {
return erasureDisks
}
z.serverPools[0].erasureDisksMu.Unlock()
// Upload new content to same object "object"
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold/2)), smallFileThreshold/2, "", ""), opts)
if !errors.Is(err, errErasureWriteQuorum) {
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
}
})
}
}
func TestObjectQuorumFromMeta(t *testing.T) {
ExecObjectLayerTestWithDirs(t, testObjectQuorumFromMeta)
}
func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) {
restoreGlobalStorageClass := globalStorageClass
defer func() {
globalStorageClass = restoreGlobalStorageClass
}()
globalStorageClass = storageclass.Config{}
bucket := getRandomBucketName()
var opts ObjectOptions
// make data with more than one part
partCount := 3
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0]
erasureDisks := xl.getDisks()
ctx, cancel := context.WithCancel(GlobalContext)
defer cancel()
err := obj.MakeBucketWithLocation(ctx, bucket, BucketOptions{})
if err != nil {
t.Fatalf("Failed to make a bucket %v", err)
}
// Object for test case 1 - No StorageClass defined, no MetaData in PutObject
object1 := "object1"
_, err = obj.PutObject(ctx, bucket, object1, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false)
parts1SC := globalStorageClass
// Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class
object2 := "object2"
metadata2 := make(map[string]string)
metadata2["x-amz-storage-class"] = storageclass.RRS
_, err = obj.PutObject(ctx, bucket, object2, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata2})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false)
parts2SC := globalStorageClass
// Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class
object3 := "object3"
metadata3 := make(map[string]string)
metadata3["x-amz-storage-class"] = storageclass.STANDARD
_, err = obj.PutObject(ctx, bucket, object3, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata3})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false)
parts3SC := globalStorageClass
// Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class
object4 := "object4"
metadata4 := make(map[string]string)
metadata4["x-amz-storage-class"] = storageclass.STANDARD
globalStorageClass = storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 6,
},
}
_, err = obj.PutObject(ctx, bucket, object4, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata4})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false)
parts4SC := storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 6,
},
}
// Object for test case 5 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting RRS Class
// Reset global storage class flags
object5 := "object5"
metadata5 := make(map[string]string)
metadata5["x-amz-storage-class"] = storageclass.RRS
globalStorageClass = storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
}
_, err = obj.PutObject(ctx, bucket, object5, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata5})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false)
parts5SC := storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
}
// Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class
object6 := "object6"
metadata6 := make(map[string]string)
metadata6["x-amz-storage-class"] = storageclass.STANDARD
globalStorageClass = storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
}
_, err = obj.PutObject(ctx, bucket, object6, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata6})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false)
parts6SC := storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
}
// Object for test case 7 - Standard StorageClass defined as Parity 5, MetaData in PutObject requesting RRS Class
// Reset global storage class flags
object7 := "object7"
metadata7 := make(map[string]string)
metadata7["x-amz-storage-class"] = storageclass.STANDARD
globalStorageClass = storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 5,
},
}
_, err = obj.PutObject(ctx, bucket, object7, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata7})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false)
parts7SC := storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 5,
},
}
tests := []struct {
parts []FileInfo
errs []error
expectedReadQuorum int
expectedWriteQuorum int
storageClassCfg storageclass.Config
expectedError error
}{
{parts1, errs1, 12, 12, parts1SC, nil},
{parts2, errs2, 14, 14, parts2SC, nil},
{parts3, errs3, 12, 12, parts3SC, nil},
{parts4, errs4, 10, 10, parts4SC, nil},
{parts5, errs5, 14, 14, parts5SC, nil},
{parts6, errs6, 12, 12, parts6SC, nil},
{parts7, errs7, 11, 11, parts7SC, nil},
}
for _, tt := range tests {
tt := tt
t.(*testing.T).Run("", func(t *testing.T) {
globalStorageClass = tt.storageClassCfg
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, tt.parts, tt.errs, getDefaultParityBlocks(len(erasureDisks)))
if tt.expectedError != nil && err == nil {
t.Errorf("Expected %s, got %s", tt.expectedError, err)
}
if tt.expectedError == nil && err != nil {
t.Errorf("Expected %s, got %s", tt.expectedError, err)
}
if tt.expectedReadQuorum != actualReadQuorum {
t.Errorf("Expected Read Quorum %d, got %d", tt.expectedReadQuorum, actualReadQuorum)
}
if tt.expectedWriteQuorum != actualWriteQuorum {
t.Errorf("Expected Write Quorum %d, got %d", tt.expectedWriteQuorum, actualWriteQuorum)
}
})
}
}