Move pod jobs to parallel execution

Make Podman pod operations that do not involve starting
containers (which needs to be done in a specific order) use the
same parallel operation code we use to make `podman stop` on
large numbers of containers fast. We were previously stopping
containers in a pod serially, which could take up to the timeout
(default 15 seconds) for each container - stopping 100 containers
that do not respond to SIGTERM would take 25 minutes.

To do this, refactor the parallel operation code a bit to remove
its dependency on libpod (damn circular import restrictions...)
and use parallel functions that just re-use the standard
container API operations - maximizes code reuse (previously each
pod handler had a separate implementation of the container
function it performed).

This is a bit of a palate cleanser after fighting CI for two
days - nice to be able to return to a land of sanity.

Signed-off-by: Matthew Heon <matthew.heon@pm.me>
This commit is contained in:
Matthew Heon 2020-08-19 16:15:35 -04:00 committed by Matthew Heon
parent a7500e54a4
commit 2bb2425704
7 changed files with 179 additions and 190 deletions

View file

@ -6,6 +6,7 @@ import (
"github.com/containers/podman/v2/libpod/define"
"github.com/containers/podman/v2/libpod/events"
"github.com/containers/podman/v2/pkg/cgroups"
"github.com/containers/podman/v2/pkg/parallel"
"github.com/containers/podman/v2/pkg/rootless"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -99,47 +100,52 @@ func (p *Pod) StopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m
return nil, err
}
ctrErrors := make(map[string]error)
// TODO: There may be cases where it makes sense to order stops based on
// dependencies. Should we bother with this?
// Stop to all containers
ctrErrChan := make(map[string]<-chan error)
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
ctr.lock.Lock()
if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
stopTimeout := ctr.config.StopTimeout
if timeout > -1 {
stopTimeout = uint(timeout)
}
if err := ctr.stop(stopTimeout); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
if cleanup {
if err := ctr.cleanup(ctx); err != nil {
ctrErrors[ctr.ID()] = err
c := ctr
logrus.Debugf("Adding parallel job to stop container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
// TODO: Might be better to batch stop and cleanup
// together?
if timeout > -1 {
if err := c.StopWithTimeout(uint(timeout)); err != nil {
return err
}
} else {
if err := c.Stop(); err != nil {
return err
}
}
}
ctr.lock.Unlock()
if cleanup {
return c.Cleanup(ctx)
}
return nil
})
ctrErrChan[c.ID()] = retChan
}
p.newPodEvent(events.Stop)
ctrErrors := make(map[string]error)
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error stopping some containers")
}
@ -169,45 +175,29 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
return nil, err
}
ctrErrChan := make(map[string]<-chan error)
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to clean up container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
return c.Cleanup(ctx)
})
ctrErrChan[c.ID()] = retChan
}
ctrErrors := make(map[string]error)
// Clean up all containers
for _, ctr := range allCtrs {
ctr.lock.Lock()
if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
// Ignore containers that are running/paused
if !ctr.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateExited) {
ctr.lock.Unlock()
continue
}
// Check for running exec sessions, ignore containers with them.
sessions, err := ctr.getActiveExecSessions()
if err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
if len(sessions) > 0 {
ctr.lock.Unlock()
continue
}
// TODO: Should we handle restart policy here?
ctr.newContainerEvent(events.Cleanup)
if err := ctr.cleanup(ctx); err != nil {
ctrErrors[ctr.ID()] = err
}
ctr.lock.Unlock()
}
if len(ctrErrors) > 0 {
@ -229,7 +219,7 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were paused without error
func (p *Pod) Pause() (map[string]error, error) {
func (p *Pod) Pause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()
@ -252,37 +242,34 @@ func (p *Pod) Pause() (map[string]error, error) {
return nil, err
}
ctrErrChan := make(map[string]<-chan error)
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to pause container %s", c.ID())
retChan := parallel.Enqueue(ctx, c.Pause)
ctrErrChan[c.ID()] = retChan
}
p.newPodEvent(events.Pause)
ctrErrors := make(map[string]error)
// Pause to all containers
for _, ctr := range allCtrs {
ctr.lock.Lock()
if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
if err := ctr.pause(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctr.lock.Unlock()
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error pausing some containers")
}
defer p.newPodEvent(events.Pause)
return nil, nil
}
@ -298,7 +285,7 @@ func (p *Pod) Pause() (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were unpaused without error.
func (p *Pod) Unpause() (map[string]error, error) {
func (p *Pod) Unpause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()
@ -311,38 +298,34 @@ func (p *Pod) Unpause() (map[string]error, error) {
return nil, err
}
ctrErrChan := make(map[string]<-chan error)
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to unpause container %s", c.ID())
retChan := parallel.Enqueue(ctx, c.Unpause)
ctrErrChan[c.ID()] = retChan
}
p.newPodEvent(events.Unpause)
ctrErrors := make(map[string]error)
// Pause to all containers
for _, ctr := range allCtrs {
ctr.lock.Lock()
if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
// Ignore containers that are not paused
if ctr.state.State != define.ContainerStatePaused {
ctr.lock.Unlock()
continue
}
if err := ctr.unpause(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
ctr.lock.Unlock()
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error unpausing some containers")
}
defer p.newPodEvent(events.Unpause)
return nil, nil
}
@ -411,7 +394,7 @@ func (p *Pod) Restart(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were signalled successfully.
func (p *Pod) Kill(signal uint) (map[string]error, error) {
func (p *Pod) Kill(ctx context.Context, signal uint) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()
@ -424,44 +407,36 @@ func (p *Pod) Kill(signal uint) (map[string]error, error) {
return nil, err
}
ctrErrChan := make(map[string]<-chan error)
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to kill container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
return c.Kill(signal)
})
ctrErrChan[c.ID()] = retChan
}
p.newPodEvent(events.Kill)
ctrErrors := make(map[string]error)
// Send a signal to all containers
for _, ctr := range allCtrs {
ctr.lock.Lock()
if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
continue
}
ctrErrors[id] = err
}
// Ignore containers that are not running
if ctr.state.State != define.ContainerStateRunning {
ctr.lock.Unlock()
continue
}
if err := ctr.ociRuntime.KillContainer(ctr, signal, false); err != nil {
ctr.lock.Unlock()
ctrErrors[ctr.ID()] = err
continue
}
logrus.Debugf("Killed container %s with signal %d", ctr.ID(), signal)
ctr.state.StoppedByUser = true
if err := ctr.save(); err != nil {
ctrErrors[ctr.ID()] = err
}
ctr.lock.Unlock()
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error killing some containers")
}
defer p.newPodEvent(events.Kill)
return nil, nil
}

View file

@ -270,7 +270,7 @@ func PodPause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
responses, err := pod.Pause()
responses, err := pod.Pause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "Something went wrong", http.StatusInternalServerError, err)
return
@ -294,7 +294,7 @@ func PodUnpause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
responses, err := pod.Unpause()
responses, err := pod.Unpause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to pause pod", http.StatusInternalServerError, err)
return
@ -402,7 +402,7 @@ func PodKill(w http.ResponseWriter, r *http.Request) {
return
}
responses, err := pod.Kill(uint(sig))
responses, err := pod.Kill(r.Context(), uint(sig))
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to kill pod", http.StatusInternalServerError, err)
return

View file

@ -23,7 +23,7 @@ import (
"github.com/containers/podman/v2/pkg/checkpoint"
"github.com/containers/podman/v2/pkg/domain/entities"
"github.com/containers/podman/v2/pkg/domain/infra/abi/terminal"
"github.com/containers/podman/v2/pkg/parallel"
parallelctr "github.com/containers/podman/v2/pkg/parallel/ctr"
"github.com/containers/podman/v2/pkg/ps"
"github.com/containers/podman/v2/pkg/rootless"
"github.com/containers/podman/v2/pkg/signal"
@ -157,7 +157,7 @@ func (ic *ContainerEngine) ContainerStop(ctx context.Context, namesOrIds []strin
if err != nil && !(options.Ignore && errors.Cause(err) == define.ErrNoSuchCtr) {
return nil, err
}
errMap, err := parallel.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
errMap, err := parallelctr.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
var err error
if options.Timeout != nil {
err = c.StopWithTimeout(*options.Timeout)
@ -321,7 +321,7 @@ func (ic *ContainerEngine) ContainerRm(ctx context.Context, namesOrIds []string,
return reports, nil
}
errMap, err := parallel.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
errMap, err := parallelctr.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
err := ic.Libpod.RemoveContainer(ctx, c, options.Force, options.Volumes)
if err != nil {
if options.Ignore && errors.Cause(err) == define.ErrNoSuchCtr {

View file

@ -66,7 +66,7 @@ func (ic *ContainerEngine) PodKill(ctx context.Context, namesOrIds []string, opt
for _, p := range pods {
report := entities.PodKillReport{Id: p.ID()}
conErrs, err := p.Kill(uint(sig))
conErrs, err := p.Kill(ctx, uint(sig))
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
report.Errs = []error{err}
reports = append(reports, &report)
@ -92,7 +92,7 @@ func (ic *ContainerEngine) PodPause(ctx context.Context, namesOrIds []string, op
}
for _, p := range pods {
report := entities.PodPauseReport{Id: p.ID()}
errs, err := p.Pause()
errs, err := p.Pause(ctx)
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
report.Errs = []error{err}
continue
@ -117,7 +117,7 @@ func (ic *ContainerEngine) PodUnpause(ctx context.Context, namesOrIds []string,
}
for _, p := range pods {
report := entities.PodUnpauseReport{Id: p.ID()}
errs, err := p.Unpause()
errs, err := p.Unpause(ctx)
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
report.Errs = []error{err}
continue

View file

@ -1,11 +1,10 @@
package parallel
package ctr
import (
"context"
"sync"
"github.com/containers/podman/v2/libpod"
"github.com/pkg/errors"
"github.com/containers/podman/v2/pkg/parallel"
"github.com/sirupsen/logrus"
)
@ -14,44 +13,28 @@ import (
// If no error is returned, each container specified in ctrs will have an entry
// in the resulting map; containers with no error will be set to nil.
func ContainerOp(ctx context.Context, ctrs []*libpod.Container, applyFunc func(*libpod.Container) error) (map[*libpod.Container]error, error) {
jobControlLock.RLock()
defer jobControlLock.RUnlock()
// We could use a sync.Map but given Go's lack of generic I'd rather
// just use a lock on a normal map...
// The expectation is that most of the time is spent in applyFunc
// anyways.
var (
errMap = make(map[*libpod.Container]error)
errLock sync.Mutex
allDone sync.WaitGroup
errMap = make(map[*libpod.Container]<-chan error)
)
for _, ctr := range ctrs {
// Block until a thread is available
if err := jobControl.Acquire(ctx, 1); err != nil {
return nil, errors.Wrapf(err, "error acquiring job control semaphore")
}
allDone.Add(1)
c := ctr
go func() {
logrus.Debugf("Launching job on container %s", c.ID())
err := applyFunc(c)
errLock.Lock()
errMap[c] = err
errLock.Unlock()
allDone.Done()
jobControl.Release(1)
}()
logrus.Debugf("Starting parallel job on container %s", c.ID())
errChan := parallel.Enqueue(ctx, func() error {
return applyFunc(c)
})
errMap[c] = errChan
}
allDone.Wait()
finalErr := make(map[*libpod.Container]error)
for ctr, errChan := range errMap {
err := <-errChan
finalErr[ctr] = err
}
return errMap, nil
return finalErr, nil
}
// TODO: Add an Enqueue() function that returns a promise

View file

@ -1,6 +1,7 @@
package parallel
import (
"context"
"sync"
"github.com/pkg/errors"
@ -42,3 +43,32 @@ func SetMaxThreads(threads uint) error {
func GetMaxThreads() uint {
return numThreads
}
// Enqueue adds a single function to the parallel jobs queue. This function will
// be run when an unused thread is available.
// Returns a receive-only error channel that will return the error (if any) from
// the provided function fn when fn has finished executing. The channel will be
// closed after this.
func Enqueue(ctx context.Context, fn func() error) <-chan error {
retChan := make(chan error)
go func() {
jobControlLock.RLock()
defer jobControlLock.RUnlock()
defer close(retChan)
if err := jobControl.Acquire(ctx, 1); err != nil {
retChan <- errors.Wrapf(err, "error acquiring job control semaphore")
return
}
err := fn()
jobControl.Release(1)
retChan <- err
}()
return retChan
}

View file

@ -3,6 +3,7 @@
package varlinkapi
import (
"context"
"encoding/json"
"fmt"
"strconv"
@ -207,7 +208,7 @@ func (i *VarlinkAPI) KillPod(call iopodman.VarlinkCall, name string, signal int6
if err != nil {
return call.ReplyPodNotFound(name, err.Error())
}
ctrErrs, err := pod.Kill(killSignal)
ctrErrs, err := pod.Kill(context.TODO(), killSignal)
callErr := handlePodCall(call, pod, ctrErrs, err)
if callErr != nil {
return err
@ -221,7 +222,7 @@ func (i *VarlinkAPI) PausePod(call iopodman.VarlinkCall, name string) error {
if err != nil {
return call.ReplyPodNotFound(name, err.Error())
}
ctrErrs, err := pod.Pause()
ctrErrs, err := pod.Pause(context.TODO())
callErr := handlePodCall(call, pod, ctrErrs, err)
if callErr != nil {
return err
@ -235,7 +236,7 @@ func (i *VarlinkAPI) UnpausePod(call iopodman.VarlinkCall, name string) error {
if err != nil {
return call.ReplyPodNotFound(name, err.Error())
}
ctrErrs, err := pod.Unpause()
ctrErrs, err := pod.Unpause(context.TODO())
callErr := handlePodCall(call, pod, ctrErrs, err)
if callErr != nil {
return err