1
0
mirror of https://github.com/minio/minio synced 2024-07-03 07:58:50 +00:00

protect workers and simplify use of atomics (#19982)

without atomic load() it is possible that for
a slow receiver we would get into a hot-loop, when
logCh is full and there are many incoming callers.

to avoid this as a workaround enable BATCH_SIZE
greater than 100 to ensure that your slow receiver
receives data in bulk to avoid being throttled in
some manner.

this PR however fixes the unprotected access to
the current workers value.
This commit is contained in:
Harshavardhana 2024-06-24 10:59:48 -07:00
parent 168ae81b1f
commit a22ce4550c
3 changed files with 49 additions and 40 deletions

View File

@ -43,8 +43,8 @@ unset MINIO_KMS_KES_KEY_FILE
unset MINIO_KMS_KES_ENDPOINT unset MINIO_KMS_KES_ENDPOINT
unset MINIO_KMS_KES_KEY_NAME unset MINIO_KMS_KES_KEY_NAME
wget -q -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && go install -v github.com/minio/mc@master
chmod +x mc cp -a $(go env GOPATH)/bin/mc ./mc
if [ ! -f mc.RELEASE.2021-03-12T03-36-59Z ]; then if [ ! -f mc.RELEASE.2021-03-12T03-36-59Z ]; then
wget -q -O mc.RELEASE.2021-03-12T03-36-59Z https://dl.minio.io/client/mc/release/linux-amd64/archive/mc.RELEASE.2021-03-12T03-36-59Z && wget -q -O mc.RELEASE.2021-03-12T03-36-59Z https://dl.minio.io/client/mc/release/linux-amd64/archive/mc.RELEASE.2021-03-12T03-36-59Z &&

View File

@ -8,10 +8,8 @@ pkill minio
pkill kes pkill kes
rm -rf /tmp/xl rm -rf /tmp/xl
if [ ! -f ./mc ]; then go install -v github.com/minio/mc@master
wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && cp -a $(go env GOPATH)/bin/mc ./mc
chmod +x mc
fi
if [ ! -f ./kes ]; then if [ ! -f ./kes ]; then
wget --quiet -O kes https://github.com/minio/kes/releases/latest/download/kes-linux-amd64 && wget --quiet -O kes https://github.com/minio/kes/releases/latest/download/kes-linux-amd64 &&
@ -39,37 +37,37 @@ export MC_HOST_myminio="http://minioadmin:minioadmin@localhost:9000/"
(minio server http://localhost:9000/tmp/xl/{1...10}/disk{0...1} 2>&1 >/dev/null) & (minio server http://localhost:9000/tmp/xl/{1...10}/disk{0...1} 2>&1 >/dev/null) &
pid=$! pid=$!
./mc ready myminio mc ready myminio
./mc admin user add myminio/ minio123 minio123 mc admin user add myminio/ minio123 minio123
./mc admin policy create myminio/ deny-non-sse-kms-pol ./docs/iam/policies/deny-non-sse-kms-objects.json mc admin policy create myminio/ deny-non-sse-kms-pol ./docs/iam/policies/deny-non-sse-kms-objects.json
./mc admin policy create myminio/ deny-invalid-sse-kms-pol ./docs/iam/policies/deny-objects-with-invalid-sse-kms-key-id.json mc admin policy create myminio/ deny-invalid-sse-kms-pol ./docs/iam/policies/deny-objects-with-invalid-sse-kms-key-id.json
./mc admin policy attach myminio deny-non-sse-kms-pol --user minio123 mc admin policy attach myminio deny-non-sse-kms-pol --user minio123
./mc admin policy attach myminio deny-invalid-sse-kms-pol --user minio123 mc admin policy attach myminio deny-invalid-sse-kms-pol --user minio123
./mc admin policy attach myminio consoleAdmin --user minio123 mc admin policy attach myminio consoleAdmin --user minio123
./mc mb -l myminio/test-bucket mc mb -l myminio/test-bucket
./mc mb -l myminio/multi-key-poc mc mb -l myminio/multi-key-poc
export MC_HOST_myminio1="http://minio123:minio123@localhost:9000/" export MC_HOST_myminio1="http://minio123:minio123@localhost:9000/"
./mc cp /etc/issue myminio1/test-bucket mc cp /etc/issue myminio1/test-bucket
ret=$? ret=$?
if [ $ret -ne 0 ]; then if [ $ret -ne 0 ]; then
echo "BUG: PutObject to bucket: test-bucket should succeed. Failed" echo "BUG: PutObject to bucket: test-bucket should succeed. Failed"
exit 1 exit 1
fi fi
./mc cp /etc/issue myminio1/multi-key-poc | grep -q "Insufficient permissions to access this path" mc cp /etc/issue myminio1/multi-key-poc | grep -q "Insufficient permissions to access this path"
ret=$? ret=$?
if [ $ret -eq 0 ]; then if [ $ret -eq 0 ]; then
echo "BUG: PutObject to bucket: multi-key-poc without sse-kms should fail. Succedded" echo "BUG: PutObject to bucket: multi-key-poc without sse-kms should fail. Succedded"
exit 1 exit 1
fi fi
./mc cp /etc/hosts myminio1/multi-key-poc/hosts --enc-kms "myminio1/multi-key-poc/hosts=minio-default-key" mc cp /etc/hosts myminio1/multi-key-poc/hosts --enc-kms "myminio1/multi-key-poc/hosts=minio-default-key"
ret=$? ret=$?
if [ $ret -ne 0 ]; then if [ $ret -ne 0 ]; then
echo "BUG: PutObject to bucket: multi-key-poc with valid sse-kms should succeed. Failed" echo "BUG: PutObject to bucket: multi-key-poc with valid sse-kms should succeed. Failed"

View File

@ -90,13 +90,14 @@ type Config struct {
// buffer is full, new logs are just ignored and an error // buffer is full, new logs are just ignored and an error
// is returned to the caller. // is returned to the caller.
type Target struct { type Target struct {
totalMessages int64 totalMessages atomic.Int64
failedMessages int64 failedMessages atomic.Int64
status int32 status atomic.Int32
// Worker control // Worker control
workers int64 workers atomic.Int64
maxWorkers int64 maxWorkers int64
// workerStartMu sync.Mutex // workerStartMu sync.Mutex
lastStarted time.Time lastStarted time.Time
@ -157,7 +158,7 @@ func (h *Target) String() string {
// IsOnline returns true if the target is reachable using a cached value // IsOnline returns true if the target is reachable using a cached value
func (h *Target) IsOnline(ctx context.Context) bool { func (h *Target) IsOnline(ctx context.Context) bool {
return atomic.LoadInt32(&h.status) == statusOnline return h.status.Load() == statusOnline
} }
// Stats returns the target statistics. // Stats returns the target statistics.
@ -166,8 +167,8 @@ func (h *Target) Stats() types.TargetStats {
queueLength := len(h.logCh) queueLength := len(h.logCh)
h.logChMu.RUnlock() h.logChMu.RUnlock()
stats := types.TargetStats{ stats := types.TargetStats{
TotalMessages: atomic.LoadInt64(&h.totalMessages), TotalMessages: h.totalMessages.Load(),
FailedMessages: atomic.LoadInt64(&h.failedMessages), FailedMessages: h.failedMessages.Load(),
QueueLength: queueLength, QueueLength: queueLength,
} }
@ -221,9 +222,9 @@ func (h *Target) initMemoryStore(ctx context.Context) (err error) {
func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) { func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
atomic.StoreInt32(&h.status, statusOffline) h.status.Store(statusOffline)
} else { } else {
atomic.StoreInt32(&h.status, statusOnline) h.status.Store(statusOnline)
} }
}() }()
@ -275,8 +276,8 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
} }
h.logChMu.RUnlock() h.logChMu.RUnlock()
atomic.AddInt64(&h.workers, 1) h.workers.Add(1)
defer atomic.AddInt64(&h.workers, -1) defer h.workers.Add(-1)
h.wg.Add(1) h.wg.Add(1)
defer h.wg.Done() defer h.wg.Done()
@ -353,7 +354,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
} }
if !isTick { if !isTick {
atomic.AddInt64(&h.totalMessages, 1) h.totalMessages.Add(1)
if !isDirQueue { if !isDirQueue {
if err := enc.Encode(&entry); err != nil { if err := enc.Encode(&entry); err != nil {
@ -362,7 +363,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry),
h.Name(), h.Name(),
) )
atomic.AddInt64(&h.failedMessages, 1) h.failedMessages.Add(1)
continue continue
} }
} }
@ -395,7 +396,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
// and when it's been at least 30 seconds since // and when it's been at least 30 seconds since
// we launched a new worker. // we launched a new worker.
if mainWorker && len(h.logCh) > cap(h.logCh)/2 { if mainWorker && len(h.logCh) > cap(h.logCh)/2 {
nWorkers := atomic.LoadInt64(&h.workers) nWorkers := h.workers.Load()
if nWorkers < h.maxWorkers { if nWorkers < h.maxWorkers {
if time.Since(h.lastStarted).Milliseconds() > 10 { if time.Since(h.lastStarted).Milliseconds() > 10 {
h.lastStarted = time.Now() h.lastStarted = time.Now()
@ -493,10 +494,10 @@ func New(config Config) (*Target, error) {
h := &Target{ h := &Target{
logCh: make(chan interface{}, config.QueueSize), logCh: make(chan interface{}, config.QueueSize),
config: config, config: config,
status: statusOffline,
batchSize: config.BatchSize, batchSize: config.BatchSize,
maxWorkers: int64(maxWorkers), maxWorkers: int64(maxWorkers),
} }
h.status.Store(statusOffline)
if config.BatchSize > 1 { if config.BatchSize > 1 {
h.payloadType = "" h.payloadType = ""
@ -528,10 +529,17 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
return err return err
} }
h.failedMessages.Add(1)
defer func() {
if err == nil {
h.failedMessages.Add(-1)
}
}()
if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
return err return err
} }
// Delete the event from store. // Delete the event from store.
return h.store.Del(key.Name) return h.store.Del(key.Name)
} }
@ -540,7 +548,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
// Messages are queued in the disk if the store is enabled // Messages are queued in the disk if the store is enabled
// If Cancel has been called the message is ignored. // If Cancel has been called the message is ignored.
func (h *Target) Send(ctx context.Context, entry interface{}) error { func (h *Target) Send(ctx context.Context, entry interface{}) error {
if atomic.LoadInt32(&h.status) == statusClosed { if h.status.Load() == statusClosed {
if h.migrateTarget != nil { if h.migrateTarget != nil {
return h.migrateTarget.Send(ctx, entry) return h.migrateTarget.Send(ctx, entry)
} }
@ -557,7 +565,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
retry: retry:
select { select {
case h.logCh <- entry: case h.logCh <- entry:
atomic.AddInt64(&h.totalMessages, 1) h.totalMessages.Add(1)
case <-ctx.Done(): case <-ctx.Done():
// return error only for context timedout. // return error only for context timedout.
if errors.Is(ctx.Err(), context.DeadlineExceeded) { if errors.Is(ctx.Err(), context.DeadlineExceeded) {
@ -565,11 +573,14 @@ retry:
} }
return nil return nil
default: default:
if h.workers < h.maxWorkers { nWorkers := h.workers.Load()
if nWorkers < h.maxWorkers {
// Just sleep to avoid any possible hot-loops.
time.Sleep(50 * time.Millisecond)
goto retry goto retry
} }
atomic.AddInt64(&h.totalMessages, 1) h.totalMessages.Add(1)
atomic.AddInt64(&h.failedMessages, 1) h.failedMessages.Add(1)
return errors.New("log buffer full") return errors.New("log buffer full")
} }
@ -580,7 +591,7 @@ retry:
// All queued messages are flushed and the function returns afterwards. // All queued messages are flushed and the function returns afterwards.
// All messages sent to the target after this function has been called will be dropped. // All messages sent to the target after this function has been called will be dropped.
func (h *Target) Cancel() { func (h *Target) Cancel() {
atomic.StoreInt32(&h.status, statusClosed) h.status.Store(statusClosed)
h.storeCtxCancel() h.storeCtxCancel()
// Wait for messages to be sent... // Wait for messages to be sent...