diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index c29cbd4b6..820cecea1 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -25,11 +25,11 @@ import ( "strings" "time" - "github.com/tinylib/msgp/msgp" - + "github.com/cespare/xxhash/v2" "github.com/google/uuid" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/tinylib/msgp/msgp" ) var ( @@ -50,7 +50,7 @@ const ( // Bumping this is informational, but should be done // if any change is made to the data stored, bumping this // will allow to detect the exact version later. - xlVersionMinor = 1 + xlVersionMinor = 2 ) func init() { @@ -323,6 +323,47 @@ func (x xlMetaInlineData) validate() error { return nil } +// repair will copy all seemingly valid data entries from a corrupted set. +// This does not ensure that data is correct, but will allow all operations to complete. +func (x *xlMetaInlineData) repair() { + data := *x + if len(data) == 0 { + return + } + + if !data.versionOK() { + *x = nil + return + } + + sz, buf, err := msgp.ReadMapHeaderBytes(data.afterVersion()) + if err != nil { + *x = nil + return + } + + // Remove all current data + keys := make([][]byte, 0, sz) + vals := make([][]byte, 0, sz) + for i := uint32(0); i < sz; i++ { + var key, val []byte + key, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + break + } + if len(key) == 0 { + break + } + val, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + break + } + keys = append(keys, key) + vals = append(vals, val) + } + x.serialize(-1, keys, vals) +} + // validate checks if the data is valid. // It does not check integrity of the stored data. func (x xlMetaInlineData) list() ([]string, error) { @@ -357,6 +398,35 @@ func (x xlMetaInlineData) list() ([]string, error) { return keys, nil } +// serialize will serialize the provided keys and values. +// The function will panic if keys/value slices aren't of equal length. +// Payload size can give an indication of expected payload size. +// If plSize is <= 0 it will be calculated. +func (x *xlMetaInlineData) serialize(plSize int, keys [][]byte, vals [][]byte) { + if len(keys) != len(vals) { + panic(fmt.Errorf("xlMetaInlineData.serialize: keys/value number mismatch")) + } + if len(keys) == 0 { + *x = nil + return + } + if plSize <= 0 { + plSize = 1 + msgp.MapHeaderSize + for i := range keys { + plSize += len(keys[i]) + len(vals[i]) + msgp.StringPrefixSize + msgp.ArrayHeaderSize + } + } + payload := make([]byte, 1, plSize) + payload[0] = xlMetaInlineDataVer + payload = msgp.AppendMapHeader(payload, uint32(len(keys))) + for i := range keys { + payload = msgp.AppendStringFromBytes(payload, keys[i]) + payload = msgp.AppendBytes(payload, vals[i]) + } + *x = payload +} + +// entries returns the number of entries in the data. func (x xlMetaInlineData) entries() int { if len(x) == 0 || !x.versionOK() { return 0 @@ -405,17 +475,7 @@ func (x *xlMetaInlineData) replace(key string, value []byte) { } // Reserialize... - payload := make([]byte, 1, plSize) - payload[0] = xlMetaInlineDataVer - payload = msgp.AppendMapHeader(payload, uint32(len(keys))) - for i := range keys { - payload = msgp.AppendStringFromBytes(payload, keys[i]) - payload = msgp.AppendBytes(payload, vals[i]) - } - *x = payload - if err := x.validate(); err != nil { - panic(err) - } + x.serialize(plSize, keys, vals) } // rename will rename a key. @@ -457,14 +517,7 @@ func (x *xlMetaInlineData) rename(oldKey, newKey string) bool { } // Reserialize... - payload := make([]byte, 1, plSize) - payload[0] = xlMetaInlineDataVer - payload = msgp.AppendMapHeader(payload, uint32(len(keys))) - for i := range keys { - payload = msgp.AppendStringFromBytes(payload, keys[i]) - payload = msgp.AppendBytes(payload, vals[i]) - } - *x = payload + x.serialize(plSize, keys, vals) return true } @@ -509,14 +562,7 @@ func (x *xlMetaInlineData) remove(key string) bool { } // Reserialize... - payload := make([]byte, 1, plSize) - payload[0] = xlMetaInlineDataVer - payload = msgp.AppendMapHeader(payload, uint32(len(keys))) - for i := range keys { - payload = msgp.AppendStringFromBytes(payload, keys[i]) - payload = msgp.AppendBytes(payload, vals[i]) - } - *x = payload + x.serialize(plSize, keys, vals) return true } @@ -537,11 +583,17 @@ func xlMetaV2TrimData(buf []byte) []byte { logger.LogIf(GlobalContext, err) return buf } + // Skip CRC + if maj > 1 || min >= 2 { + _, metaBuf, err = msgp.ReadUint32Bytes(metaBuf) + logger.LogIf(GlobalContext, err) + } // = input - current pos ends := len(buf) - len(metaBuf) if ends > len(buf) { return buf } + return buf[:ends] } @@ -565,39 +617,57 @@ func (z *xlMetaV2) AddLegacy(m *xlMetaV1Object) error { // Load unmarshal and load the entire message pack. // Note that references to the incoming buffer may be kept as data. func (z *xlMetaV2) Load(buf []byte) error { - buf, _, minor, err := checkXL2V1(buf) + buf, major, minor, err := checkXL2V1(buf) if err != nil { - return fmt.Errorf("z.Load %w", err) + return fmt.Errorf("xlMetaV2.Load %w", err) } - switch minor { - case 0: - _, err = z.UnmarshalMsg(buf) - if err != nil { - return fmt.Errorf("z.Load %w", err) - } - return nil + switch major { case 1: - v, buf, err := msgp.ReadBytesZC(buf) - if err != nil { - return fmt.Errorf("z.Load version(%d), bufLen(%d) %w", minor, len(buf), err) - } - if _, err = z.UnmarshalMsg(v); err != nil { - return fmt.Errorf("z.Load version(%d), vLen(%d), %w", minor, len(v), err) - } - // Add remaining data. - z.data = buf - if err = z.data.validate(); err != nil { - return fmt.Errorf("z.Load version(%d), bufLen(%d) %w", minor, len(buf), err) + switch minor { + case 0: + _, err = z.UnmarshalMsg(buf) + if err != nil { + return fmt.Errorf("xlMetaV2.Load %w", err) + } + return nil + case 1, 2: + v, buf, err := msgp.ReadBytesZC(buf) + if err != nil { + return fmt.Errorf("xlMetaV2.Load version(%d), bufLen(%d) %w", minor, len(buf), err) + } + if minor >= 2 { + if crc, nbuf, err := msgp.ReadUint32Bytes(buf); err == nil { + // Read metadata CRC (added in v2) + buf = nbuf + if got := uint32(xxhash.Sum64(v)); got != crc { + return fmt.Errorf("xlMetaV2.Load version(%d), CRC mismatch, want 0x%x, got 0x%x", minor, crc, got) + } + } else { + return fmt.Errorf("xlMetaV2.Load version(%d), loading CRC: %w", minor, err) + } + } + + if _, err = z.UnmarshalMsg(v); err != nil { + return fmt.Errorf("xlMetaV2.Load version(%d), vLen(%d), %w", minor, len(v), err) + } + // Add remaining data. + z.data = buf + if err = z.data.validate(); err != nil { + z.data.repair() + logger.Info("xlMetaV2.Load: data validation failed: %v. %d entries after repair", err, z.data.entries()) + } + default: + return errors.New("unknown minor metadata version") } default: - return errors.New("unknown metadata version") + return errors.New("unknown major metadata version") } return nil } // AppendTo will marshal the data in z and append it to the provided slice. func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { - sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst) + sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst) + msgp.Uint32Size if cap(dst) < sz { buf := make([]byte, len(dst), sz) copy(buf, dst) @@ -621,6 +691,8 @@ func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { // Update size... binary.BigEndian.PutUint32(dst[dataOffset-4:dataOffset], uint32(len(dst)-dataOffset)) + // Add CRC of metadata. + dst = msgp.AppendUint32(dst, uint32(xxhash.Sum64(dst[dataOffset:]))) return append(dst, z.data...), nil } diff --git a/cmd/xl-storage-format-v2_test.go b/cmd/xl-storage-format-v2_test.go index f88500720..9fbc90cac 100644 --- a/cmd/xl-storage-format-v2_test.go +++ b/cmd/xl-storage-format-v2_test.go @@ -137,8 +137,14 @@ func TestXLV2FormatData(t *testing.T) { // Test trimmed xl2 = xlMetaV2{} - failOnErr(xl2.Load(xlMetaV2TrimData(serialized))) + trimmed := xlMetaV2TrimData(serialized) + failOnErr(xl2.Load(trimmed)) if len(xl2.data) != 0 { t.Fatal("data, was not trimmed, bytes left:", len(xl2.data)) } + // Corrupt metadata, last 5 bytes is the checksum, so go a bit further back. + trimmed[len(trimmed)-10] += 10 + if err := xl2.Load(trimmed); err == nil { + t.Fatal("metadata corruption not detected") + } }