journald logger: fix race condition

Fix a race in journald driver.  Following the logs implies streaming
until the container is dead.  Streaming happened in one goroutine,
waiting for the container to exit/die and signaling that event happened
in another goroutine.

The nature of having two goroutines running simultaneously is pretty
much the core of the race condition.  When the streaming goroutines
received the signal that the container has exitted, the routine may not
have read and written all of the container's logs.

Fix this race by reading both, the logs and the events, of the container
and stop streaming when the died/exited event has been read.  The died
event is guaranteed to be after all logs in the journal which guarantees
not only consistencty but also a deterministic behavior.

Note that the journald log driver now requires the journald event
backend to be set.

Fixes: #10323
Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
This commit is contained in:
Valentin Rothberg 2021-05-20 11:07:28 +02:00
parent e81457dc8e
commit 10569c988f
4 changed files with 223 additions and 124 deletions

View file

@ -6,14 +6,12 @@ package libpod
import (
"context"
"fmt"
"io"
"math"
"strings"
"time"
"github.com/containers/podman/v3/libpod/define"
"github.com/containers/podman/v3/libpod/events"
"github.com/containers/podman/v3/libpod/logs"
journal "github.com/coreos/go-systemd/v22/sdjournal"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -24,122 +22,187 @@ const (
// journaldLogErr is the journald priority signifying stderr
journaldLogErr = "3"
// bufLen is the length of the buffer to read from a k8s-file
// formatted log line
// let's set it as 2k just to be safe if k8s-file format ever changes
bufLen = 16384
)
func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = 0
} else if options.Tail == 0 {
config.NumFromTail = math.MaxUint64
} else {
config.NumFromTail = uint64(options.Tail)
journal, err := sdjournal.NewJournal()
if err != nil {
return err
}
if options.Multi {
config.Formatter = journalFormatterWithID
} else {
config.Formatter = journalFormatter
// While logs are written to the `logChannel`, we inspect each event
// and stop once the container has died. Having logs and events in one
// stream prevents a race condition that we faced in #10323.
// Add the filters for events.
match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
defaultTime := time.Time{}
if options.Since != defaultTime {
// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
// return nothing instead of falsely printing
if time.Now().Before(options.Since) {
return nil
match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
// Add the filter for logs. Note the disjunction so that we match
// either the events or the logs.
if err := journal.AddDisjunction(); err != nil {
return errors.Wrap(err, "adding filter disjunction to journald logger")
}
match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
if err := journal.SeekHead(); err != nil {
return err
}
// API requires Next() immediately after SeekHead().
if _, err := journal.Next(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
// API requires a next|prev before getting a cursor.
if _, err := journal.Previous(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
// Note that the initial cursor may not yet be ready, so we'll do an
// exponential backoff.
var cursor string
var cursorError error
for i := 1; i <= 3; i++ {
cursor, cursorError = journal.GetCursor()
if err != nil {
continue
}
// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
config.Since = -time.Since(options.Since)
time.Sleep(time.Duration(i*100) * time.Millisecond)
break
}
config.Matches = append(config.Matches, journal.Match{
Field: "CONTAINER_ID_FULL",
Value: c.ID(),
})
if cursorError != nil {
return errors.Wrap(cursorError, "inital journal cursor")
}
// We need the container's events in the same journal to guarantee
// consistency, see #10323.
if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" {
return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger)
}
options.WaitGroup.Add(1)
r, err := journal.NewJournalReader(config)
if err != nil {
return err
}
if r == nil {
return errors.Errorf("journal reader creation failed")
}
if options.Tail == math.MaxInt64 {
r.Rewind()
}
state, err := c.State()
if err != nil {
return err
}
if options.Follow && state == define.ContainerStateRunning {
go func() {
done := make(chan bool)
until := make(chan time.Time)
go func() {
select {
case <-ctx.Done():
until <- time.Time{}
case <-done:
// nothing to do anymore
}
}()
go func() {
// FIXME (#10323): we are facing a terrible
// race condition here. At the time the
// container dies and `c.Wait()` has returned,
// we may not have received all journald logs.
// So far there is no other way than waiting
// for a second. Ultimately, `r.Follow` is
// racy and we may have to implement our custom
// logic here.
c.Wait(ctx)
time.Sleep(time.Second)
until <- time.Time{}
}()
follower := journaldFollowBuffer{logChannel, options.Multi}
err := r.Follow(until, follower)
if err != nil {
logrus.Debugf(err.Error())
}
r.Close()
options.WaitGroup.Done()
done <- true
return
}()
return nil
}
go func() {
bytes := make([]byte, bufLen)
// /me complains about no do-while in go
ec, err := r.Read(bytes)
for ec != 0 && err == nil {
// because we are reusing bytes, we need to make
// sure the old data doesn't get into the new line
bytestr := string(bytes[:ec])
logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
if err2 != nil {
logrus.Error(err2)
defer func() {
options.WaitGroup.Done()
if err := journal.Close(); err != nil {
logrus.Errorf("Unable to close journal: %v", err)
}
}()
afterTimeStamp := false // needed for options.Since
tailQueue := []*logs.LogLine{} // needed for options.Tail
doTail := options.Tail > 0
for {
select {
case <-ctx.Done():
// Remote client may have closed/lost the connection.
return
default:
// Fallthrough
}
if _, err := journal.Next(); err != nil {
logrus.Errorf("Failed to move journal cursor to next entry: %v", err)
return
}
latestCursor, err := journal.GetCursor()
if err != nil {
logrus.Errorf("Failed to get journal cursor: %v", err)
return
}
// Hit the end of the journal.
if cursor == latestCursor {
if doTail {
// Flush *once* we hit the end of the journal.
startIndex := int64(len(tailQueue)-1) - options.Tail
if startIndex < 0 {
startIndex = 0
}
for i := startIndex; i < int64(len(tailQueue)); i++ {
logChannel <- tailQueue[i]
}
tailQueue = nil
doTail = false
}
// Unless we follow, quit.
if !options.Follow {
return
}
// Sleep until something's happening on the journal.
journal.Wait(sdjournal.IndefiniteWait)
continue
}
cursor = latestCursor
entry, err := journal.GetEntry()
if err != nil {
logrus.Errorf("Failed to get journal entry: %v", err)
return
}
if !afterTimeStamp {
entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if entryTime.Before(options.Since) {
continue
}
afterTimeStamp = true
}
// If we're reading an event and the container exited/died,
// then we're done and can return.
event, ok := entry.Fields["PODMAN_EVENT"]
if ok {
status, err := events.StringToStatus(event)
if err != nil {
logrus.Errorf("Failed to translate event: %v", err)
return
}
if status == events.Exited {
return
}
continue
}
var message string
var formatError error
if options.Multi {
message, formatError = journalFormatterWithID(entry)
} else {
message, formatError = journalFormatter(entry)
}
if formatError != nil {
logrus.Errorf("Failed to parse journald log entry: %v", err)
return
}
logLine, err := logs.NewJournaldLogLine(message, options.Multi)
if err != nil {
logrus.Errorf("Failed parse log line: %v", err)
return
}
if doTail {
tailQueue = append(tailQueue, logLine)
continue
}
logChannel <- logLine
ec, err = r.Read(bytes)
}
if err != nil && err != io.EOF {
logrus.Error(err)
}
r.Close()
options.WaitGroup.Done()
}()
return nil
}
func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
return output, nil
}
func journalFormatter(entry *journal.JournalEntry) (string, error) {
func journalFormatter(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) {
return output, nil
}
func formatterPrefix(entry *journal.JournalEntry) (string, error) {
func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) {
usec := entry.RealtimeTimestamp
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
output := fmt.Sprintf("%s ", tsString)
@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) {
return output, nil
}
func formatterMessage(entry *journal.JournalEntry) (string, error) {
func formatterMessage(entry *sdjournal.JournalEntry) (string, error) {
// Finally, append the message
msg, ok := entry.Fields["MESSAGE"]
if !ok {
@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) {
msg = strings.TrimSuffix(msg, "\n")
return msg, nil
}
type journaldFollowBuffer struct {
logChannel chan *logs.LogLine
withID bool
}
func (f journaldFollowBuffer) Write(p []byte) (int, error) {
bytestr := string(p)
logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
if err != nil {
return -1, err
}
f.logChannel <- logLine
return len(p), nil
}

View file

@ -163,7 +163,7 @@ var _ = Describe("Podman logs", func() {
})
It("podman logs on a created container should result in 0 exit code: "+log, func() {
session := podmanTest.Podman([]string{"create", "-t", "--name", "log", ALPINE})
session := podmanTest.Podman([]string{"create", "--log-driver", log, "-t", "--name", "log", ALPINE})
session.WaitWithDefaultTimeout()
Expect(session).To(Exit(0))

View file

@ -73,4 +73,56 @@ ${cid[0]} d" "Sequential output from logs"
_log_test_multi journald
}
@test "podman logs - journald log driver requires journald events backend" {
skip_if_remote "remote does not support --events-backend"
# We can't use journald on RHEL as rootless: rhbz#1895105
skip_if_journald_unavailable
run_podman --events-backend=file run --log-driver=journald -d --name test --replace $IMAGE ls /
run_podman --events-backend=file logs test
run_podman 125 --events-backend=file logs --follow test
is "$output" "Error: using --follow with the journald --log-driver but without the journald --events-backend (file) is not supported" "journald logger requires journald eventer"
}
function _log_test_since() {
local driver=$1
s_before="before_$(random_string)_${driver}"
s_after="after_$(random_string)_${driver}"
before=$(date --iso-8601=seconds)
run_podman run --log-driver=$driver -d --name test $IMAGE sh -c \
"echo $s_before; trap 'echo $s_after; exit' SIGTERM; while :; do sleep 1; done"
# sleep a second to make sure the date is after the first echo
sleep 1
after=$(date --iso-8601=seconds)
run_podman stop test
run_podman logs test
is "$output" \
"$s_before
$s_after"
run_podman logs --since $before test
is "$output" \
"$s_before
$s_after"
run_podman logs --since $after test
is "$output" "$s_after"
run_podman rm -f test
}
@test "podman logs - since k8s-file" {
_log_test_since k8s-file
}
@test "podman logs - since journald" {
# We can't use journald on RHEL as rootless: rhbz#1895105
skip_if_journald_unavailable
_log_test_since journald
}
# vim: filetype=sh

View file

@ -8,8 +8,7 @@ load helpers
@test "podman kill - test signal handling in containers" {
# Start a container that will handle all signals by emitting 'got: N'
local -a signals=(1 2 3 4 5 6 8 10 12 13 14 15 16 20 21 22 23 24 25 26 64)
# Force the k8s-file driver until #10323 is fixed.
run_podman run --log-driver=k8s-file -d $IMAGE sh -c \
run_podman run -d $IMAGE sh -c \
"for i in ${signals[*]}; do trap \"echo got: \$i\" \$i; done;
echo READY;
while ! test -e /stop; do sleep 0.05; done;