From f0ca0b3ca9ad5873000450de6c82e07a89e2568f Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 9 Apr 2021 02:29:54 +0200 Subject: [PATCH] Add metadata checksum (#12017) - Add 32-bit checksum (32 LSB part of xxhash64) of the serialized metadata. This will ensure that we always reject corrupted metadata. - Add automatic repair of inline data, so the data structure can be used. If data was corrupted, we remove all unreadable entries to ensure that operations can succeed on the object. Since higher layers add bitrot checks this is not a big problem. Cannot downgrade to v1.1 metadata, but since that isn't released, no need for a major bump. --- cmd/xl-storage-format-v2.go | 176 ++++++++++++++++++++++--------- cmd/xl-storage-format-v2_test.go | 8 +- 2 files changed, 131 insertions(+), 53 deletions(-) diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index c29cbd4b6..820cecea1 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -25,11 +25,11 @@ import ( "strings" "time" - "github.com/tinylib/msgp/msgp" - + "github.com/cespare/xxhash/v2" "github.com/google/uuid" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/tinylib/msgp/msgp" ) var ( @@ -50,7 +50,7 @@ const ( // Bumping this is informational, but should be done // if any change is made to the data stored, bumping this // will allow to detect the exact version later. - xlVersionMinor = 1 + xlVersionMinor = 2 ) func init() { @@ -323,6 +323,47 @@ func (x xlMetaInlineData) validate() error { return nil } +// repair will copy all seemingly valid data entries from a corrupted set. +// This does not ensure that data is correct, but will allow all operations to complete. +func (x *xlMetaInlineData) repair() { + data := *x + if len(data) == 0 { + return + } + + if !data.versionOK() { + *x = nil + return + } + + sz, buf, err := msgp.ReadMapHeaderBytes(data.afterVersion()) + if err != nil { + *x = nil + return + } + + // Remove all current data + keys := make([][]byte, 0, sz) + vals := make([][]byte, 0, sz) + for i := uint32(0); i < sz; i++ { + var key, val []byte + key, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + break + } + if len(key) == 0 { + break + } + val, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + break + } + keys = append(keys, key) + vals = append(vals, val) + } + x.serialize(-1, keys, vals) +} + // validate checks if the data is valid. // It does not check integrity of the stored data. func (x xlMetaInlineData) list() ([]string, error) { @@ -357,6 +398,35 @@ func (x xlMetaInlineData) list() ([]string, error) { return keys, nil } +// serialize will serialize the provided keys and values. +// The function will panic if keys/value slices aren't of equal length. +// Payload size can give an indication of expected payload size. +// If plSize is <= 0 it will be calculated. +func (x *xlMetaInlineData) serialize(plSize int, keys [][]byte, vals [][]byte) { + if len(keys) != len(vals) { + panic(fmt.Errorf("xlMetaInlineData.serialize: keys/value number mismatch")) + } + if len(keys) == 0 { + *x = nil + return + } + if plSize <= 0 { + plSize = 1 + msgp.MapHeaderSize + for i := range keys { + plSize += len(keys[i]) + len(vals[i]) + msgp.StringPrefixSize + msgp.ArrayHeaderSize + } + } + payload := make([]byte, 1, plSize) + payload[0] = xlMetaInlineDataVer + payload = msgp.AppendMapHeader(payload, uint32(len(keys))) + for i := range keys { + payload = msgp.AppendStringFromBytes(payload, keys[i]) + payload = msgp.AppendBytes(payload, vals[i]) + } + *x = payload +} + +// entries returns the number of entries in the data. func (x xlMetaInlineData) entries() int { if len(x) == 0 || !x.versionOK() { return 0 @@ -405,17 +475,7 @@ func (x *xlMetaInlineData) replace(key string, value []byte) { } // Reserialize... - payload := make([]byte, 1, plSize) - payload[0] = xlMetaInlineDataVer - payload = msgp.AppendMapHeader(payload, uint32(len(keys))) - for i := range keys { - payload = msgp.AppendStringFromBytes(payload, keys[i]) - payload = msgp.AppendBytes(payload, vals[i]) - } - *x = payload - if err := x.validate(); err != nil { - panic(err) - } + x.serialize(plSize, keys, vals) } // rename will rename a key. @@ -457,14 +517,7 @@ func (x *xlMetaInlineData) rename(oldKey, newKey string) bool { } // Reserialize... - payload := make([]byte, 1, plSize) - payload[0] = xlMetaInlineDataVer - payload = msgp.AppendMapHeader(payload, uint32(len(keys))) - for i := range keys { - payload = msgp.AppendStringFromBytes(payload, keys[i]) - payload = msgp.AppendBytes(payload, vals[i]) - } - *x = payload + x.serialize(plSize, keys, vals) return true } @@ -509,14 +562,7 @@ func (x *xlMetaInlineData) remove(key string) bool { } // Reserialize... - payload := make([]byte, 1, plSize) - payload[0] = xlMetaInlineDataVer - payload = msgp.AppendMapHeader(payload, uint32(len(keys))) - for i := range keys { - payload = msgp.AppendStringFromBytes(payload, keys[i]) - payload = msgp.AppendBytes(payload, vals[i]) - } - *x = payload + x.serialize(plSize, keys, vals) return true } @@ -537,11 +583,17 @@ func xlMetaV2TrimData(buf []byte) []byte { logger.LogIf(GlobalContext, err) return buf } + // Skip CRC + if maj > 1 || min >= 2 { + _, metaBuf, err = msgp.ReadUint32Bytes(metaBuf) + logger.LogIf(GlobalContext, err) + } // = input - current pos ends := len(buf) - len(metaBuf) if ends > len(buf) { return buf } + return buf[:ends] } @@ -565,39 +617,57 @@ func (z *xlMetaV2) AddLegacy(m *xlMetaV1Object) error { // Load unmarshal and load the entire message pack. // Note that references to the incoming buffer may be kept as data. func (z *xlMetaV2) Load(buf []byte) error { - buf, _, minor, err := checkXL2V1(buf) + buf, major, minor, err := checkXL2V1(buf) if err != nil { - return fmt.Errorf("z.Load %w", err) + return fmt.Errorf("xlMetaV2.Load %w", err) } - switch minor { - case 0: - _, err = z.UnmarshalMsg(buf) - if err != nil { - return fmt.Errorf("z.Load %w", err) - } - return nil + switch major { case 1: - v, buf, err := msgp.ReadBytesZC(buf) - if err != nil { - return fmt.Errorf("z.Load version(%d), bufLen(%d) %w", minor, len(buf), err) - } - if _, err = z.UnmarshalMsg(v); err != nil { - return fmt.Errorf("z.Load version(%d), vLen(%d), %w", minor, len(v), err) - } - // Add remaining data. - z.data = buf - if err = z.data.validate(); err != nil { - return fmt.Errorf("z.Load version(%d), bufLen(%d) %w", minor, len(buf), err) + switch minor { + case 0: + _, err = z.UnmarshalMsg(buf) + if err != nil { + return fmt.Errorf("xlMetaV2.Load %w", err) + } + return nil + case 1, 2: + v, buf, err := msgp.ReadBytesZC(buf) + if err != nil { + return fmt.Errorf("xlMetaV2.Load version(%d), bufLen(%d) %w", minor, len(buf), err) + } + if minor >= 2 { + if crc, nbuf, err := msgp.ReadUint32Bytes(buf); err == nil { + // Read metadata CRC (added in v2) + buf = nbuf + if got := uint32(xxhash.Sum64(v)); got != crc { + return fmt.Errorf("xlMetaV2.Load version(%d), CRC mismatch, want 0x%x, got 0x%x", minor, crc, got) + } + } else { + return fmt.Errorf("xlMetaV2.Load version(%d), loading CRC: %w", minor, err) + } + } + + if _, err = z.UnmarshalMsg(v); err != nil { + return fmt.Errorf("xlMetaV2.Load version(%d), vLen(%d), %w", minor, len(v), err) + } + // Add remaining data. + z.data = buf + if err = z.data.validate(); err != nil { + z.data.repair() + logger.Info("xlMetaV2.Load: data validation failed: %v. %d entries after repair", err, z.data.entries()) + } + default: + return errors.New("unknown minor metadata version") } default: - return errors.New("unknown metadata version") + return errors.New("unknown major metadata version") } return nil } // AppendTo will marshal the data in z and append it to the provided slice. func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { - sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst) + sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst) + msgp.Uint32Size if cap(dst) < sz { buf := make([]byte, len(dst), sz) copy(buf, dst) @@ -621,6 +691,8 @@ func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { // Update size... binary.BigEndian.PutUint32(dst[dataOffset-4:dataOffset], uint32(len(dst)-dataOffset)) + // Add CRC of metadata. + dst = msgp.AppendUint32(dst, uint32(xxhash.Sum64(dst[dataOffset:]))) return append(dst, z.data...), nil } diff --git a/cmd/xl-storage-format-v2_test.go b/cmd/xl-storage-format-v2_test.go index f88500720..9fbc90cac 100644 --- a/cmd/xl-storage-format-v2_test.go +++ b/cmd/xl-storage-format-v2_test.go @@ -137,8 +137,14 @@ func TestXLV2FormatData(t *testing.T) { // Test trimmed xl2 = xlMetaV2{} - failOnErr(xl2.Load(xlMetaV2TrimData(serialized))) + trimmed := xlMetaV2TrimData(serialized) + failOnErr(xl2.Load(trimmed)) if len(xl2.data) != 0 { t.Fatal("data, was not trimmed, bytes left:", len(xl2.data)) } + // Corrupt metadata, last 5 bytes is the checksum, so go a bit further back. + trimmed[len(trimmed)-10] += 10 + if err := xl2.Load(trimmed); err == nil { + t.Fatal("metadata corruption not detected") + } }