fix: speedtest should exit upon errors cleanly (#13851)

- deleteBucket() should be called for cleanup
  if client abruptly disconnects

- out of disk errors should be sent to client
  properly and also cancel the calls

- limit concurrency to available MAXPROCS not
  32 for auto-tuned setup, if procs are beyond
  32 then continue normally. this is to handle
  smaller setups.

fixes #13834
This commit is contained in:
Harshavardhana 2021-12-06 16:36:14 -08:00 committed by GitHub
parent 7d70afc937
commit b9aae1aaae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 36 deletions

View file

@ -969,6 +969,10 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
concurrent = 32 concurrent = 32
} }
if runtime.GOMAXPROCS(0) < concurrent {
concurrent = runtime.GOMAXPROCS(0)
}
duration, err := time.ParseDuration(durationStr) duration, err := time.ParseDuration(durationStr)
if err != nil { if err != nil {
duration = time.Second * 10 duration = time.Second * 10
@ -981,6 +985,7 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
NoRecreate: true, NoRecreate: true,
}) })
} }
defer deleteBucket()
// Freeze all incoming S3 API calls before running speedtest. // Freeze all incoming S3 API calls before running speedtest.
globalNotificationSys.ServiceFreeze(ctx, true) globalNotificationSys.ServiceFreeze(ctx, true)
@ -1005,7 +1010,6 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case result, ok := <-ch: case result, ok := <-ch:
if !ok { if !ok {
deleteBucket()
return return
} }
if err := enc.Encode(result); err != nil { if err := enc.Encode(result); err != nil {

View file

@ -1141,20 +1141,19 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
return SpeedtestResult{}, errServerNotInitialized return SpeedtestResult{}, errServerNotInitialized
} }
var errOnce sync.Once
var retError string var retError string
var wg sync.WaitGroup
var totalBytesWritten uint64
var totalBytesRead uint64
bucket := minioMetaSpeedTestBucket bucket := minioMetaSpeedTestBucket
objCountPerThread := make([]uint64, concurrent) objCountPerThread := make([]uint64, concurrent)
uploadsStopped := false
var totalBytesWritten uint64
uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
defer uploadsCancel()
var wg sync.WaitGroup
go func() { go func() {
time.Sleep(duration) time.Sleep(duration)
uploadsStopped = true
uploadsCancel() uploadsCancel()
}() }()
@ -1168,9 +1167,14 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
hashReader, err := hash.NewReader(newRandomReader(size), hashReader, err := hash.NewReader(newRandomReader(size),
int64(size), "", "", int64(size)) int64(size), "", "", int64(size))
if err != nil { if err != nil {
retError = err.Error() if !contextCanceled(uploadsCtx) {
logger.LogIf(ctx, err) errOnce.Do(func() {
break retError = err.Error()
logger.LogIf(ctx, err)
})
}
uploadsCancel()
return
} }
reader := NewPutObjReader(hashReader) reader := NewPutObjReader(hashReader)
objInfo, err := objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d", objInfo, err := objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d",
@ -1179,12 +1183,15 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
xhttp.AmzStorageClass: storageClass, xhttp.AmzStorageClass: storageClass,
}, },
}) })
if err != nil && !uploadsStopped {
retError = err.Error()
logger.LogIf(ctx, err)
}
if err != nil { if err != nil {
break if !contextCanceled(uploadsCtx) {
errOnce.Do(func() {
retError = err.Error()
logger.LogIf(ctx, err)
})
}
uploadsCancel()
return
} }
atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size)) atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size))
objCountPerThread[i]++ objCountPerThread[i]++
@ -1193,13 +1200,15 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
} }
wg.Wait() wg.Wait()
downloadsStopped := false // We already saw write failures, no need to proceed into read's
var totalBytesRead uint64 if retError != "" {
downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
}
downloadsCtx, downloadsCancel := context.WithCancel(context.Background())
defer downloadsCancel()
go func() { go func() {
time.Sleep(duration) time.Sleep(duration)
downloadsStopped = true
downloadsCancel() downloadsCancel()
}() }()
@ -1217,12 +1226,15 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
} }
r, err := objAPI.GetObjectNInfo(downloadsCtx, bucket, fmt.Sprintf("%s.%d.%d", r, err := objAPI.GetObjectNInfo(downloadsCtx, bucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{})
if err != nil && !downloadsStopped {
retError = err.Error()
logger.LogIf(ctx, err)
}
if err != nil { if err != nil {
break if !contextCanceled(downloadsCtx) {
errOnce.Do(func() {
retError = err.Error()
logger.LogIf(ctx, err)
})
}
downloadsCancel()
return
} }
n, err := io.Copy(ioutil.Discard, r) n, err := io.Copy(ioutil.Discard, r)
r.Close() r.Close()
@ -1232,18 +1244,22 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
// reads etc. // reads etc.
atomic.AddUint64(&totalBytesRead, uint64(n)) atomic.AddUint64(&totalBytesRead, uint64(n))
} }
if err != nil && !downloadsStopped {
retError = err.Error()
logger.LogIf(ctx, err)
}
if err != nil { if err != nil {
break if !contextCanceled(downloadsCtx) {
errOnce.Do(func() {
retError = err.Error()
logger.LogIf(ctx, err)
})
}
downloadsCancel()
return
} }
j++ j++
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
} }

View file

@ -1078,10 +1078,7 @@ func speedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestRes
break break
} }
doBreak := false doBreak := float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025
if float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025 {
doBreak = true
}
throughputHighestGet = totalGet throughputHighestGet = totalGet
throughputHighestResults = results throughputHighestResults = results
@ -1092,12 +1089,19 @@ func speedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestRes
break break
} }
if !opts.autotune { for _, result := range results {
sendResult() if result.Error != "" {
break // Break out on errors.
sendResult()
return
}
} }
sendResult() sendResult()
if !opts.autotune {
break
}
// Try with a higher concurrency to see if we get better throughput // Try with a higher concurrency to see if we get better throughput
concurrency += (concurrency + 1) / 2 concurrency += (concurrency + 1) / 2
} }