Check for absence of checksum field and attributes. (#6298)

Fixes #6295
This commit is contained in:
kannappanr 2018-08-20 16:58:47 -07:00 committed by Dee Koder
parent 8b2801bd46
commit 2d84b02bc4
3 changed files with 33 additions and 6 deletions

View file

@ -490,7 +490,9 @@ func loadConfig(objAPI ObjectLayer) error {
// * Handle the configuration in this function to create/add into TargetList.
func getNotificationTargets(config *serverConfig) *event.TargetList {
targetList := event.NewTargetList()
if config == nil {
return targetList
}
for id, args := range config.Notify.AMQP {
if args.Enable {
newTarget, err := target.NewAMQPTarget(id, args)

View file

@ -608,16 +608,28 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) {
// Create context that also contains information about the object and bucket.
// The top level handler might not have this information.
reqInfo := logger.GetReqInfo(ctx)
var newReqInfo *logger.ReqInfo
if reqInfo != nil {
newReqInfo = logger.NewReqInfo(reqInfo.RemoteHost, reqInfo.UserAgent, reqInfo.RequestID, reqInfo.API, bucket, object)
} else {
newReqInfo = logger.NewReqInfo("", "", "", "Heal", bucket, object)
}
healCtx := logger.SetReqInfo(context.Background(), newReqInfo)
// Healing directories handle it separately.
if hasSuffix(object, slashSeparator) {
return xl.healObjectDir(ctx, bucket, object, dryRun)
return xl.healObjectDir(healCtx, bucket, object, dryRun)
}
// FIXME: Metadata is read again in the healObject() call below.
// Read metadata files from all the disks
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
partsMetadata, errs := readAllXLMetadata(healCtx, xl.getDisks(), bucket, object)
latestXLMeta, err := getLatestXLMeta(ctx, partsMetadata, errs)
latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs)
if err != nil {
return hr, toObjectErr(err, bucket, object)
}
@ -630,5 +642,5 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
defer objectLock.RUnlock()
// Heal the object.
return healObject(ctx, xl.getDisks(), bucket, object, latestXLMeta.Erasure.DataBlocks, dryRun)
return healObject(healCtx, xl.getDisks(), bucket, object, latestXLMeta.Erasure.DataBlocks, dryRun)
}

View file

@ -162,6 +162,13 @@ func parseXLErasureInfo(ctx context.Context, xlMetaBuf []byte) (ErasureInfo, err
erasure.Index = int(erasureResult.Get("index").Int())
checkSumsResult := erasureResult.Get("checksum").Array()
// Check for scenario where checksum information missing for some parts.
partsResult := gjson.GetBytes(xlMetaBuf, "parts").Array()
if len(checkSumsResult) != len(partsResult) {
return erasure, errCorruptedFormat
}
// Parse xlMetaV1.Erasure.Checksum array.
checkSums := make([]ChecksumInfo, len(checkSumsResult))
for i, v := range checkSumsResult {
@ -175,7 +182,11 @@ func parseXLErasureInfo(ctx context.Context, xlMetaBuf []byte) (ErasureInfo, err
logger.LogIf(ctx, err)
return erasure, err
}
checkSums[i] = ChecksumInfo{Name: v.Get("name").String(), Algorithm: algorithm, Hash: hash}
name := v.Get("name").String()
if name == "" {
return erasure, errCorruptedFormat
}
checkSums[i] = ChecksumInfo{Name: name, Algorithm: algorithm, Hash: hash}
}
erasure.Checksums = checkSums
return erasure, nil
@ -301,6 +312,8 @@ func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object stri
// obtain xlMetaV1{} using `github.com/tidwall/gjson`.
xlMeta, err = xlMetaV1UnmarshalJSON(ctx, xlMetaBuf)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("disk", disk.String())
logger.LogIf(ctx, err)
return xlMetaV1{}, err
}
// Return structured `xl.json`.