diff --git a/docs/bucket/replication/setup_3site_replication.sh b/docs/bucket/replication/setup_3site_replication.sh index 869d9f4b8..8cbb104dc 100755 --- a/docs/bucket/replication/setup_3site_replication.sh +++ b/docs/bucket/replication/setup_3site_replication.sh @@ -43,8 +43,8 @@ unset MINIO_KMS_KES_KEY_FILE unset MINIO_KMS_KES_ENDPOINT unset MINIO_KMS_KES_KEY_NAME -wget -q -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && - chmod +x mc +go install -v github.com/minio/mc@master +cp -a $(go env GOPATH)/bin/mc ./mc if [ ! -f mc.RELEASE.2021-03-12T03-36-59Z ]; then wget -q -O mc.RELEASE.2021-03-12T03-36-59Z https://dl.minio.io/client/mc/release/linux-amd64/archive/mc.RELEASE.2021-03-12T03-36-59Z && diff --git a/docs/iam/policies/pbac-tests.sh b/docs/iam/policies/pbac-tests.sh index c645db281..607abc3eb 100755 --- a/docs/iam/policies/pbac-tests.sh +++ b/docs/iam/policies/pbac-tests.sh @@ -8,10 +8,8 @@ pkill minio pkill kes rm -rf /tmp/xl -if [ ! -f ./mc ]; then - wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && - chmod +x mc -fi +go install -v github.com/minio/mc@master +cp -a $(go env GOPATH)/bin/mc ./mc if [ ! -f ./kes ]; then wget --quiet -O kes https://github.com/minio/kes/releases/latest/download/kes-linux-amd64 && @@ -39,37 +37,37 @@ export MC_HOST_myminio="http://minioadmin:minioadmin@localhost:9000/" (minio server http://localhost:9000/tmp/xl/{1...10}/disk{0...1} 2>&1 >/dev/null) & pid=$! -./mc ready myminio +mc ready myminio -./mc admin user add myminio/ minio123 minio123 +mc admin user add myminio/ minio123 minio123 -./mc admin policy create myminio/ deny-non-sse-kms-pol ./docs/iam/policies/deny-non-sse-kms-objects.json -./mc admin policy create myminio/ deny-invalid-sse-kms-pol ./docs/iam/policies/deny-objects-with-invalid-sse-kms-key-id.json +mc admin policy create myminio/ deny-non-sse-kms-pol ./docs/iam/policies/deny-non-sse-kms-objects.json +mc admin policy create myminio/ deny-invalid-sse-kms-pol ./docs/iam/policies/deny-objects-with-invalid-sse-kms-key-id.json -./mc admin policy attach myminio deny-non-sse-kms-pol --user minio123 -./mc admin policy attach myminio deny-invalid-sse-kms-pol --user minio123 -./mc admin policy attach myminio consoleAdmin --user minio123 +mc admin policy attach myminio deny-non-sse-kms-pol --user minio123 +mc admin policy attach myminio deny-invalid-sse-kms-pol --user minio123 +mc admin policy attach myminio consoleAdmin --user minio123 -./mc mb -l myminio/test-bucket -./mc mb -l myminio/multi-key-poc +mc mb -l myminio/test-bucket +mc mb -l myminio/multi-key-poc export MC_HOST_myminio1="http://minio123:minio123@localhost:9000/" -./mc cp /etc/issue myminio1/test-bucket +mc cp /etc/issue myminio1/test-bucket ret=$? if [ $ret -ne 0 ]; then echo "BUG: PutObject to bucket: test-bucket should succeed. Failed" exit 1 fi -./mc cp /etc/issue myminio1/multi-key-poc | grep -q "Insufficient permissions to access this path" +mc cp /etc/issue myminio1/multi-key-poc | grep -q "Insufficient permissions to access this path" ret=$? if [ $ret -eq 0 ]; then echo "BUG: PutObject to bucket: multi-key-poc without sse-kms should fail. Succedded" exit 1 fi -./mc cp /etc/hosts myminio1/multi-key-poc/hosts --enc-kms "myminio1/multi-key-poc/hosts=minio-default-key" +mc cp /etc/hosts myminio1/multi-key-poc/hosts --enc-kms "myminio1/multi-key-poc/hosts=minio-default-key" ret=$? if [ $ret -ne 0 ]; then echo "BUG: PutObject to bucket: multi-key-poc with valid sse-kms should succeed. Failed" diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index c29a97786..f1fd35fbb 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -90,13 +90,14 @@ type Config struct { // buffer is full, new logs are just ignored and an error // is returned to the caller. type Target struct { - totalMessages int64 - failedMessages int64 - status int32 + totalMessages atomic.Int64 + failedMessages atomic.Int64 + status atomic.Int32 // Worker control - workers int64 + workers atomic.Int64 maxWorkers int64 + // workerStartMu sync.Mutex lastStarted time.Time @@ -157,7 +158,7 @@ func (h *Target) String() string { // IsOnline returns true if the target is reachable using a cached value func (h *Target) IsOnline(ctx context.Context) bool { - return atomic.LoadInt32(&h.status) == statusOnline + return h.status.Load() == statusOnline } // Stats returns the target statistics. @@ -166,8 +167,8 @@ func (h *Target) Stats() types.TargetStats { queueLength := len(h.logCh) h.logChMu.RUnlock() stats := types.TargetStats{ - TotalMessages: atomic.LoadInt64(&h.totalMessages), - FailedMessages: atomic.LoadInt64(&h.failedMessages), + TotalMessages: h.totalMessages.Load(), + FailedMessages: h.failedMessages.Load(), QueueLength: queueLength, } @@ -221,9 +222,9 @@ func (h *Target) initMemoryStore(ctx context.Context) (err error) { func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) { defer func() { if err != nil { - atomic.StoreInt32(&h.status, statusOffline) + h.status.Store(statusOffline) } else { - atomic.StoreInt32(&h.status, statusOnline) + h.status.Store(statusOnline) } }() @@ -275,8 +276,8 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } h.logChMu.RUnlock() - atomic.AddInt64(&h.workers, 1) - defer atomic.AddInt64(&h.workers, -1) + h.workers.Add(1) + defer h.workers.Add(-1) h.wg.Add(1) defer h.wg.Done() @@ -353,7 +354,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } if !isTick { - atomic.AddInt64(&h.totalMessages, 1) + h.totalMessages.Add(1) if !isDirQueue { if err := enc.Encode(&entry); err != nil { @@ -362,7 +363,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), h.Name(), ) - atomic.AddInt64(&h.failedMessages, 1) + h.failedMessages.Add(1) continue } } @@ -395,7 +396,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { // and when it's been at least 30 seconds since // we launched a new worker. if mainWorker && len(h.logCh) > cap(h.logCh)/2 { - nWorkers := atomic.LoadInt64(&h.workers) + nWorkers := h.workers.Load() if nWorkers < h.maxWorkers { if time.Since(h.lastStarted).Milliseconds() > 10 { h.lastStarted = time.Now() @@ -493,10 +494,10 @@ func New(config Config) (*Target, error) { h := &Target{ logCh: make(chan interface{}, config.QueueSize), config: config, - status: statusOffline, batchSize: config.BatchSize, maxWorkers: int64(maxWorkers), } + h.status.Store(statusOffline) if config.BatchSize > 1 { h.payloadType = "" @@ -528,10 +529,17 @@ func (h *Target) SendFromStore(key store.Key) (err error) { return err } + h.failedMessages.Add(1) + defer func() { + if err == nil { + h.failedMessages.Add(-1) + } + }() + if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { - atomic.AddInt64(&h.failedMessages, 1) return err } + // Delete the event from store. return h.store.Del(key.Name) } @@ -540,7 +548,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { // Messages are queued in the disk if the store is enabled // If Cancel has been called the message is ignored. func (h *Target) Send(ctx context.Context, entry interface{}) error { - if atomic.LoadInt32(&h.status) == statusClosed { + if h.status.Load() == statusClosed { if h.migrateTarget != nil { return h.migrateTarget.Send(ctx, entry) } @@ -557,7 +565,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { retry: select { case h.logCh <- entry: - atomic.AddInt64(&h.totalMessages, 1) + h.totalMessages.Add(1) case <-ctx.Done(): // return error only for context timedout. if errors.Is(ctx.Err(), context.DeadlineExceeded) { @@ -565,11 +573,14 @@ retry: } return nil default: - if h.workers < h.maxWorkers { + nWorkers := h.workers.Load() + if nWorkers < h.maxWorkers { + // Just sleep to avoid any possible hot-loops. + time.Sleep(50 * time.Millisecond) goto retry } - atomic.AddInt64(&h.totalMessages, 1) - atomic.AddInt64(&h.failedMessages, 1) + h.totalMessages.Add(1) + h.failedMessages.Add(1) return errors.New("log buffer full") } @@ -580,7 +591,7 @@ retry: // All queued messages are flushed and the function returns afterwards. // All messages sent to the target after this function has been called will be dropped. func (h *Target) Cancel() { - atomic.StoreInt32(&h.status, statusClosed) + h.status.Store(statusClosed) h.storeCtxCancel() // Wait for messages to be sent...