Merge pull request #6916 from vrothberg/logs-fixes

log API: add context to allow for cancelling
This commit is contained in:
OpenShift Merge Robot 2020-07-09 17:05:47 +02:00 committed by GitHub
commit bc3b3b373f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 75 additions and 57 deletions

View file

@ -32,6 +32,8 @@ var (
Long: logsDescription,
Args: func(cmd *cobra.Command, args []string) error {
switch {
case registry.IsRemote() && logsOptions.Latest:
return errors.New(cmd.Name() + " does not support 'latest' when run remotely")
case registry.IsRemote() && len(args) > 1:
return errors.New(cmd.Name() + " does not support multiple containers when run remotely")
case logsOptions.Latest && len(args) > 0:

View file

@ -353,7 +353,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
logOpts.WaitGroup.Wait()
close(logChan)
}()
if err := c.ReadLog(logOpts, logChan); err != nil {
if err := c.ReadLog(context.Background(), logOpts, logChan); err != nil {
return err
}
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)

View file

@ -1,6 +1,7 @@
package libpod
import (
"context"
"fmt"
"os"
"time"
@ -13,9 +14,9 @@ import (
)
// Log is a runtime function that can read one or more container logs.
func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
func (r *Runtime) Log(ctx context.Context, containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
for _, ctr := range containers {
if err := ctr.ReadLog(options, logChannel); err != nil {
if err := ctr.ReadLog(ctx, options, logChannel); err != nil {
return err
}
}
@ -23,25 +24,25 @@ func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChan
}
// ReadLog reads a containers log based on the input options and returns loglines over a channel.
func (c *Container) ReadLog(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
func (c *Container) ReadLog(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
switch c.LogDriver() {
case define.NoLogging:
return errors.Wrapf(define.ErrNoLogs, "this container is using the 'none' log driver, cannot read logs")
case define.JournaldLogging:
// TODO Skip sending logs until journald logs can be read
return c.readFromJournal(options, logChannel)
return c.readFromJournal(ctx, options, logChannel)
case define.JSONLogging:
// TODO provide a separate implementation of this when Conmon
// has support.
fallthrough
case define.KubernetesLogging, "":
return c.readFromLogFile(options, logChannel)
return c.readFromLogFile(ctx, options, logChannel)
default:
return errors.Wrapf(define.ErrInternal, "unrecognized log driver %q, cannot read logs", c.LogDriver())
}
}
func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
t, tailLog, err := logs.GetLogFile(c.LogPath(), options)
if err != nil {
// If the log file does not exist, this is not fatal.
@ -62,8 +63,17 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l
}
go func() {
defer options.WaitGroup.Done()
var partial string
for line := range t.Lines {
select {
case <-ctx.Done():
// the consumer has cancelled
return
default:
// fallthrough
}
nll, err := logs.NewLogLine(line.Text)
if err != nil {
logrus.Error(err)
@ -82,7 +92,6 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l
logChannel <- nll
}
}
options.WaitGroup.Done()
}()
// Check if container is still running or paused
if options.Follow {

View file

@ -4,6 +4,7 @@
package libpod
import (
"context"
"fmt"
"io"
"math"
@ -29,7 +30,7 @@ const (
bufLen = 16384
)
func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = math.MaxUint64
@ -65,13 +66,24 @@ func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *l
if options.Follow {
go func() {
done := make(chan bool)
until := make(chan time.Time)
go func() {
select {
case <-ctx.Done():
until <- time.Time{}
case <-done:
// nothing to do anymore
}
}()
follower := FollowBuffer{logChannel}
err := r.Follow(nil, follower)
err := r.Follow(until, follower)
if err != nil {
logrus.Debugf(err.Error())
}
r.Close()
options.WaitGroup.Done()
done <- true
return
}()
return nil

View file

@ -3,11 +3,13 @@
package libpod
import (
"context"
"github.com/containers/libpod/v2/libpod/define"
"github.com/containers/libpod/v2/libpod/logs"
"github.com/pkg/errors"
)
func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
func (c *Container) readFromJournal(_ context.Context, _ *logs.LogOptions, _ chan *logs.LogLine) error {
return errors.Wrapf(define.ErrOSNotSupported, "Journald logging only enabled with systemd on linux")
}

View file

@ -92,7 +92,7 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
options.WaitGroup = &wg
logChannel := make(chan *logs.LogLine, tail+1)
if err := runtime.Log([]*libpod.Container{ctnr}, options, logChannel); err != nil {
if err := runtime.Log(r.Context(), []*libpod.Container{ctnr}, options, logChannel); err != nil {
utils.InternalServerError(w, errors.Wrapf(err, "Failed to obtain logs for Container '%s'", name))
return
}
@ -105,50 +105,48 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
var frame strings.Builder
header := make([]byte, 8)
for ok := true; ok; ok = query.Follow {
for line := range logChannel {
if _, found := r.URL.Query()["until"]; found {
if line.Time.After(until) {
break
}
for line := range logChannel {
if _, found := r.URL.Query()["until"]; found {
if line.Time.After(until) {
break
}
}
// Reset buffer we're ready to loop again
frame.Reset()
switch line.Device {
case "stdout":
if !query.Stdout {
continue
}
header[0] = 1
case "stderr":
if !query.Stderr {
continue
}
header[0] = 2
default:
// Logging and moving on is the best we can do here. We may have already sent
// a Status and Content-Type to client therefore we can no longer report an error.
log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID())
// Reset buffer we're ready to loop again
frame.Reset()
switch line.Device {
case "stdout":
if !query.Stdout {
continue
}
header[0] = 1
case "stderr":
if !query.Stderr {
continue
}
header[0] = 2
default:
// Logging and moving on is the best we can do here. We may have already sent
// a Status and Content-Type to client therefore we can no longer report an error.
log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID())
continue
}
if query.Timestamps {
frame.WriteString(line.Time.Format(time.RFC3339))
frame.WriteString(" ")
}
frame.WriteString(line.Msg)
if query.Timestamps {
frame.WriteString(line.Time.Format(time.RFC3339))
frame.WriteString(" ")
}
frame.WriteString(line.Msg)
binary.BigEndian.PutUint32(header[4:], uint32(frame.Len()))
if _, err := w.Write(header[0:8]); err != nil {
log.Errorf("unable to write log output header: %q", err)
}
if _, err := io.WriteString(w, frame.String()); err != nil {
log.Errorf("unable to write frame string: %q", err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
binary.BigEndian.PutUint32(header[4:], uint32(frame.Len()))
if _, err := w.Write(header[0:8]); err != nil {
log.Errorf("unable to write log output header: %q", err)
}
if _, err := io.WriteString(w, frame.String()); err != nil {
log.Errorf("unable to write frame string: %q", err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
}

View file

@ -924,7 +924,7 @@ func (ic *ContainerEngine) ContainerLogs(ctx context.Context, containers []strin
}
logChannel := make(chan *logs.LogLine, chSize)
if err := ic.Libpod.Log(ctrs, logOpts, logChannel); err != nil {
if err := ic.Libpod.Log(ctx, ctrs, logOpts, logChannel); err != nil {
return err
}

View file

@ -754,7 +754,7 @@ func (i *VarlinkAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string
if err != nil {
return call.ReplyErrorOccurred(err.Error())
}
if err := i.Runtime.Log(containers, &options, logChannel); err != nil {
if err := i.Runtime.Log(getContext(), containers, &options, logChannel); err != nil {
return err
}
go func() {

View file

@ -121,7 +121,6 @@ var _ = Describe("Podman logs", func() {
})
It("latest and container name should fail", func() {
SkipIfRemote() // -l not supported
results := podmanTest.Podman([]string{"logs", "-l", "foobar"})
results.WaitWithDefaultTimeout()
Expect(results).To(ExitWithError())
@ -159,7 +158,6 @@ var _ = Describe("Podman logs", func() {
})
It("using journald for container with container tag", func() {
SkipIfRemote()
Skip("need to verify images have correct packages for journald")
logc := podmanTest.Podman([]string{"run", "--log-driver", "journald", "--log-opt=tag={{.ImageName}}", "-d", ALPINE, "sh", "-c", "echo podman; sleep 0.1; echo podman; sleep 0.1; echo podman"})
logc.WaitWithDefaultTimeout()
@ -178,7 +176,6 @@ var _ = Describe("Podman logs", func() {
It("using journald for container name", func() {
Skip("need to verify images have correct packages for journald")
SkipIfRemote()
containerName := "inside-journal"
logc := podmanTest.Podman([]string{"run", "--log-driver", "journald", "-d", "--name", containerName, ALPINE, "sh", "-c", "echo podman; sleep 0.1; echo podman; sleep 0.1; echo podman"})
logc.WaitWithDefaultTimeout()
@ -273,7 +270,6 @@ var _ = Describe("Podman logs", func() {
})
It("streaming output", func() {
Skip(v2remotefail)
containerName := "logs-f-rm"
logc := podmanTest.Podman([]string{"run", "--rm", "--name", containerName, "-dt", ALPINE, "sh", "-c", "echo podman; sleep 1; echo podman"})
@ -314,7 +310,6 @@ var _ = Describe("Podman logs", func() {
})
It("follow output stopped container", func() {
Skip(v2remotefail)
containerName := "logs-f"
logc := podmanTest.Podman([]string{"run", "--name", containerName, "-d", ALPINE, "true"})