diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 90f6c4805..fe2cce519 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -969,6 +969,10 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques concurrent = 32 } + if runtime.GOMAXPROCS(0) < concurrent { + concurrent = runtime.GOMAXPROCS(0) + } + duration, err := time.ParseDuration(durationStr) if err != nil { duration = time.Second * 10 @@ -981,6 +985,7 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques NoRecreate: true, }) } + defer deleteBucket() // Freeze all incoming S3 API calls before running speedtest. globalNotificationSys.ServiceFreeze(ctx, true) @@ -1005,7 +1010,6 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques w.(http.Flusher).Flush() case result, ok := <-ch: if !ok { - deleteBucket() return } if err := enc.Encode(result); err != nil { diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index daf4cb67b..a27b697e6 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1141,20 +1141,19 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura return SpeedtestResult{}, errServerNotInitialized } + var errOnce sync.Once var retError string + var wg sync.WaitGroup + var totalBytesWritten uint64 + var totalBytesRead uint64 bucket := minioMetaSpeedTestBucket objCountPerThread := make([]uint64, concurrent) - - uploadsStopped := false - var totalBytesWritten uint64 uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) - - var wg sync.WaitGroup + defer uploadsCancel() go func() { time.Sleep(duration) - uploadsStopped = true uploadsCancel() }() @@ -1168,9 +1167,14 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura hashReader, err := hash.NewReader(newRandomReader(size), int64(size), "", "", int64(size)) if err != nil { - retError = err.Error() - logger.LogIf(ctx, err) - break + if !contextCanceled(uploadsCtx) { + errOnce.Do(func() { + retError = err.Error() + logger.LogIf(ctx, err) + }) + } + uploadsCancel() + return } reader := NewPutObjReader(hashReader) objInfo, err := objAPI.PutObject(uploadsCtx, bucket, fmt.Sprintf("%s.%d.%d", @@ -1179,12 +1183,15 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura xhttp.AmzStorageClass: storageClass, }, }) - if err != nil && !uploadsStopped { - retError = err.Error() - logger.LogIf(ctx, err) - } if err != nil { - break + if !contextCanceled(uploadsCtx) { + errOnce.Do(func() { + retError = err.Error() + logger.LogIf(ctx, err) + }) + } + uploadsCancel() + return } atomic.AddUint64(&totalBytesWritten, uint64(objInfo.Size)) objCountPerThread[i]++ @@ -1193,13 +1200,15 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura } wg.Wait() - downloadsStopped := false - var totalBytesRead uint64 - downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) + // We already saw write failures, no need to proceed into read's + if retError != "" { + return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil + } + downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) + defer downloadsCancel() go func() { time.Sleep(duration) - downloadsStopped = true downloadsCancel() }() @@ -1217,12 +1226,15 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura } r, err := objAPI.GetObjectNInfo(downloadsCtx, bucket, fmt.Sprintf("%s.%d.%d", objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) - if err != nil && !downloadsStopped { - retError = err.Error() - logger.LogIf(ctx, err) - } if err != nil { - break + if !contextCanceled(downloadsCtx) { + errOnce.Do(func() { + retError = err.Error() + logger.LogIf(ctx, err) + }) + } + downloadsCancel() + return } n, err := io.Copy(ioutil.Discard, r) r.Close() @@ -1232,18 +1244,22 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura // reads etc. atomic.AddUint64(&totalBytesRead, uint64(n)) } - if err != nil && !downloadsStopped { - retError = err.Error() - logger.LogIf(ctx, err) - } if err != nil { - break + if !contextCanceled(downloadsCtx) { + errOnce.Do(func() { + retError = err.Error() + logger.LogIf(ctx, err) + }) + } + downloadsCancel() + return } j++ } }(i) } wg.Wait() + return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil } diff --git a/cmd/utils.go b/cmd/utils.go index 3ca474b41..07fa6cc18 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1078,10 +1078,7 @@ func speedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestRes break } - doBreak := false - if float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025 { - doBreak = true - } + doBreak := float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025 throughputHighestGet = totalGet throughputHighestResults = results @@ -1092,12 +1089,19 @@ func speedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedTestRes break } - if !opts.autotune { - sendResult() - break + for _, result := range results { + if result.Error != "" { + // Break out on errors. + sendResult() + return + } } sendResult() + if !opts.autotune { + break + } + // Try with a higher concurrency to see if we get better throughput concurrency += (concurrency + 1) / 2 }