maintenance window API (#22850)

This commit is contained in:
Forrest 2023-04-10 17:23:03 -07:00 committed by GitHub
parent fe0810b5cb
commit ceb61f53d2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 5588 additions and 2079 deletions

View file

@ -3256,6 +3256,37 @@ func (c *Client) DeleteAllUserGroups(ctx context.Context) error {
return nil
}
// ExportUpgradeWindows is used to load derived upgrade window values for agents that
// need to export schedules to external upgraders.
func (c *Client) ExportUpgradeWindows(ctx context.Context, req proto.ExportUpgradeWindowsRequest) (proto.ExportUpgradeWindowsResponse, error) {
rsp, err := c.grpc.ExportUpgradeWindows(ctx, &req)
if err != nil {
return proto.ExportUpgradeWindowsResponse{}, trail.FromGRPC(err)
}
return *rsp, nil
}
// GetClusterMaintenanceConfig gets the current maintenance window config singleton.
func (c *Client) GetClusterMaintenanceConfig(ctx context.Context) (types.ClusterMaintenanceConfig, error) {
rsp, err := c.grpc.GetClusterMaintenanceConfig(ctx, &emptypb.Empty{})
if err != nil {
return nil, trail.FromGRPC(err)
}
return rsp, nil
}
// UpdateClusterMaintenanceConfig updates the current maintenance window config singleton.
func (c *Client) UpdateClusterMaintenanceConfig(ctx context.Context, cmc types.ClusterMaintenanceConfig) error {
req, ok := cmc.(*types.ClusterMaintenanceConfigV1)
if !ok {
return trace.BadParameter("unexpected maintenance config type: %T", cmc)
}
_, err := c.grpc.UpdateClusterMaintenanceConfig(ctx, req)
return trail.FromGRPC(err)
}
// PluginsClient returns an unadorned Plugins client, using the underlying
// Auth gRPC connection.
// Clients connecting to non-Enterprise clusters, or older Teleport versions,

File diff suppressed because it is too large Load diff

View file

@ -2233,6 +2233,31 @@ message UpdateHeadlessAuthenticationStateRequest {
MFAAuthenticateResponse mfa_response = 3;
}
// ExportUpgradeWindowsRequest encodes parameters for loading the
// upgrader-facing representations of upcoming agent maintenance windows.
message ExportUpgradeWindowsRequest {
// TeleportVersion is the version of the teleport client making the request.
string TeleportVersion = 1;
// UpgraderKind represents the kind of upgrader the schedule is intended for.
string UpgraderKind = 2;
}
// ExportUpgradeWindowsResponse encodes an upgrader-facing representation
// of upcoming agent maintenance windows. Teleport agents periodically export these
// schedules to external upgraders as part of the externally-managed upgrade system.
message ExportUpgradeWindowsResponse {
// CanonicalSchedule is the teleport-facing schedule repr.
types.AgentUpgradeSchedule CanonicalSchedule = 1;
// KubeControllerSchedule encodes upcoming upgrade upgrade windows in a format known
// to the kube upgrade controller. Teleport agents should treat this value as an
// opaque blob.
string KubeControllerSchedule = 2;
// SystemdUnitSchedule encodes the upcoming upgrade windows in a format known to
// the teleport-upgrade systemd unit. Teleport agents should treat this value as an
// opaque blob.
string SystemdUnitSchedule = 3;
}
// AuthService is authentication/authorization service implementation
service AuthService {
// InventoryControlStream is the per-instance stream used to advertise teleport instance
@ -2881,4 +2906,14 @@ service AuthService {
// UpdateHeadlessAuthenticationState is a request to update a headless authentication's state.
rpc UpdateHeadlessAuthenticationState(UpdateHeadlessAuthenticationStateRequest) returns (google.protobuf.Empty);
// ExportUpgradeWindows is used to load derived maintenance window values for agents that
// need to export schedules to external upgraders.
rpc ExportUpgradeWindows(ExportUpgradeWindowsRequest) returns (ExportUpgradeWindowsResponse);
// GetClusterMaintenanceConfig gets the current maintenance window config singleton.
rpc GetClusterMaintenanceConfig(google.protobuf.Empty) returns (types.ClusterMaintenanceConfigV1);
// UpdateClusterMaintenanceConfig updates the current maintenance window config singleton.
rpc UpdateClusterMaintenanceConfig(types.ClusterMaintenanceConfigV1) returns (google.protobuf.Empty);
}

View file

@ -5132,6 +5132,69 @@ message KubernetesResourceSpecV1 {
string Namespace = 1 [(gogoproto.jsontag) = "namespace"];
}
// ClusterMaintenanceConfigV1 is a config singleton used to configure infrequent
// cluster maintenance operations.
message ClusterMaintenanceConfigV1 {
ResourceHeader Header = 1 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "",
(gogoproto.embed) = true
];
ClusterMaintenanceConfigSpecV1 Spec = 2 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "spec"
];
// Nonce is used to protect against concurrent modification of the maintenance
// window. Clients should treat nonces as opaque.
uint64 Nonce = 3 [(gogoproto.jsontag) = "nonce,omitempty"];
}
// ClusterMaintenanceConfigSpecV1 encodes the parameters of the upgrade window config object.
message ClusterMaintenanceConfigSpecV1 {
// AgentUpgrades encodes the agent upgrade window.
AgentUpgradeWindow AgentUpgrades = 1 [(gogoproto.jsontag) = "agent_upgrades,omitempty"];
}
// AgentUpgradeWindow is the config object used to determine upcoming agent
// upgrade windows.
message AgentUpgradeWindow {
// UTCStartHour is the start hour of the maintenance window in UTC.
uint32 UTCStartHour = 1 [(gogoproto.jsontag) = "utc_start_hour"];
// Weekdays is an optional list of weekdays. If not specified, an agent upgrade window
// occurs every day.
repeated string Weekdays = 2 [(gogoproto.jsontag) = "weekdays,omitempty"];
}
// ScheduledAgentUpgradeWindow is a derived value representing a single
// upgrade window. Upgraders deal with discrete start/end times, so we use the
// agent upgrade window configuration object to generate a sequence of specific
// scheduled windows.
message ScheduledAgentUpgradeWindow {
// Start is the start time of the upgrade window.
google.protobuf.Timestamp Start = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "start"
];
// Stop is the stop time of the upgrade window.
google.protobuf.Timestamp Stop = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "stop"
];
}
// AgentUpgradeSchedule is the canonical representation of upcoming
// agent upgrade windows as generated by the AgentUpgradeWindow config object.
message AgentUpgradeSchedule {
// Windows is the list of upcoming windows.
repeated ScheduledAgentUpgradeWindow Windows = 1 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "windows"
];
}
// UserGroupV1 is a representation of an externally sourced user group.
message UserGroupV1 {
option (gogoproto.goproto_stringer) = false;

View file

@ -341,6 +341,13 @@ const (
// KindIntegration is a connection to a 3rd party system API.
KindIntegration = "integration"
// KindClusterMaintenanceConfig determines maintenance times for the cluster.
KindClusterMaintenanceConfig = "cluster_maintenance_config"
// MetaNameClusterMaintenanceConfig is the only allowed metadata.name value for the maintenance
// window singleton resource.
MetaNameClusterMaintenanceConfig = "cluster-maintenance-config"
// V6 is the sixth version of resources.
V6 = "v6"

230
api/types/maintenance.go Normal file
View file

@ -0,0 +1,230 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/gravitational/trace"
)
const (
// UpgraderKindKuberController is a short name used to identify the kube-controller-based
// external upgrader variant.
UpgraderKindKubeController = "kube"
// UpgraderKindSystemdUnit is a short name used to identify the systemd-unit-based
// external upgrader variant.
UpgraderKindSystemdUnit = "unit"
)
var validWeekdays = [7]time.Weekday{
time.Sunday,
time.Monday,
time.Tuesday,
time.Wednesday,
time.Thursday,
time.Friday,
time.Saturday,
}
// parseWeekday attempts to interpret a string as a time.Weekday. In the interest of flexibility,
// parsing is case-insensitive and supports the common three-letter shorthand accepted by many
// common scheduling utilites (e.g. contab, systemd timers).
func parseWeekday(s string) (day time.Weekday, ok bool) {
for _, w := range validWeekdays {
if strings.EqualFold(w.String(), s) || strings.EqualFold(w.String()[:3], s) {
return w, true
}
}
return time.Sunday, false
}
// generator builds a closure that iterates valid maintenance config from the current day onward. Used in
// schedule export logic and tests.
func (w *AgentUpgradeWindow) generator(from time.Time) func() (start time.Time, end time.Time) {
from = from.UTC()
next := time.Date(
from.Year(),
from.Month(),
from.Day(),
int(w.UTCStartHour%24),
0, // min
0, // sec
0, // nsec
time.UTC,
)
var weekdays []time.Weekday
for _, d := range w.Weekdays {
if p, ok := parseWeekday(d); ok {
weekdays = append(weekdays, p)
}
}
return func() (start time.Time, end time.Time) {
for { // safe because invalid weekdays have been filtered out
start = next
end = start.Add(time.Hour)
next = next.AddDate(0, 0, 1)
if len(weekdays) == 0 {
return
}
for _, day := range weekdays {
if start.Weekday() == day {
return
}
}
}
}
}
// Export exports the next `n` upgrade windows as a schedule object, starting from `from`.
func (w *AgentUpgradeWindow) Export(from time.Time, n int) AgentUpgradeSchedule {
gen := w.generator(from)
sched := AgentUpgradeSchedule{
Windows: make([]ScheduledAgentUpgradeWindow, 0, n),
}
for i := 0; i < n; i++ {
start, stop := gen()
sched.Windows = append(sched.Windows, ScheduledAgentUpgradeWindow{
Start: start.UTC(),
Stop: stop.UTC(),
})
}
return sched
}
func (s *AgentUpgradeSchedule) Clone() *AgentUpgradeSchedule {
return proto.Clone(s).(*AgentUpgradeSchedule)
}
// NewClusterMaintenanceConfig creates a new maintenance config with no parameters set.
func NewClusterMaintenanceConfig() ClusterMaintenanceConfig {
var cmc ClusterMaintenanceConfigV1
cmc.setStaticFields()
return &cmc
}
// ClusterMaintenanceConfig represents a singleton config object used to schedule maintenance
// windows. Currently this config object's only purpose is to configure a global agent
// upgrade window, used to coordinate upgrade timing for non-control-plane agents.
type ClusterMaintenanceConfig interface {
Resource
// GetNonce gets the nonce of the maintenance config.
GetNonce() uint64
// WithNonce creates a shallow copy with a new nonce.
WithNonce(nonce uint64) any
// GetAgentUpgradeWindow gets the agent upgrade window.
GetAgentUpgradeWindow() (win AgentUpgradeWindow, ok bool)
// SetAgentUpgradeWindow sets the agent upgrade window.
SetAgentUpgradeWindow(win AgentUpgradeWindow)
CheckAndSetDefaults() error
}
func (m *ClusterMaintenanceConfigV1) setStaticFields() {
if m.Version == "" {
m.Version = V1
}
if m.Kind == "" {
m.Kind = KindClusterMaintenanceConfig
}
if m.Metadata.Name == "" {
m.Metadata.Name = MetaNameClusterMaintenanceConfig
}
}
func (m *ClusterMaintenanceConfigV1) CheckAndSetDefaults() error {
m.setStaticFields()
if err := m.ResourceHeader.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
if m.Version != V1 {
return trace.BadParameter("unexpected maintenance config resource version %q (expected %q)", m.Version, V1)
}
if m.Kind == MetaNameClusterMaintenanceConfig {
// normalize easy mixup
m.Kind = KindClusterMaintenanceConfig
}
if m.Kind != KindClusterMaintenanceConfig {
return trace.BadParameter("unexpected maintenance config kind %q (expected %q)", m.Kind, KindClusterMaintenanceConfig)
}
if m.Metadata.Name == KindClusterMaintenanceConfig {
// normalize easy mixup
m.Metadata.Name = MetaNameClusterMaintenanceConfig
}
if m.Metadata.Name != MetaNameClusterMaintenanceConfig {
return trace.BadParameter("unexpected maintenance config name %q (expected %q)", m.Metadata.Name, MetaNameClusterMaintenanceConfig)
}
if m.Spec.AgentUpgrades != nil {
if h := m.Spec.AgentUpgrades.UTCStartHour; h > 23 {
return trace.BadParameter("agent upgrade window utc start hour must be in range 0..23, got %d", h)
}
for _, day := range m.Spec.AgentUpgrades.Weekdays {
if _, ok := parseWeekday(day); !ok {
return trace.BadParameter("invalid weekday in agent upgrade window: %q", day)
}
}
}
return nil
}
func (m *ClusterMaintenanceConfigV1) GetNonce() uint64 {
return m.Nonce
}
func (m *ClusterMaintenanceConfigV1) WithNonce(nonce uint64) any {
shallowCopy := *m
shallowCopy.Nonce = nonce
return &shallowCopy
}
func (m *ClusterMaintenanceConfigV1) GetAgentUpgradeWindow() (win AgentUpgradeWindow, ok bool) {
if m.Spec.AgentUpgrades == nil {
return AgentUpgradeWindow{}, false
}
return *m.Spec.AgentUpgrades, true
}
func (m *ClusterMaintenanceConfigV1) SetAgentUpgradeWindow(win AgentUpgradeWindow) {
m.Spec.AgentUpgrades = &win
}

View file

@ -0,0 +1,216 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestAgentUpgradeWindow(t *testing.T) {
newTime := func(day int, hour int) time.Time {
return time.Date(
2000,
time.January,
day,
hour,
0, // min
0, // sec
0, // nsec
time.UTC,
)
}
from := newTime(1, 12)
require.Equal(t, time.Saturday, from.Weekday()) // verify that newTime starts from expected pos
conf := AgentUpgradeWindow{
UTCStartHour: 2,
}
tts := []struct{ start, stop time.Time }{
{newTime(1, 2), newTime(1, 3)},
{newTime(2, 2), newTime(2, 3)},
{newTime(3, 2), newTime(3, 3)},
{newTime(4, 2), newTime(4, 3)},
{newTime(5, 2), newTime(5, 3)},
{newTime(6, 2), newTime(6, 3)},
{newTime(7, 2), newTime(7, 3)},
{newTime(8, 2), newTime(8, 3)},
{newTime(9, 2), newTime(9, 3)},
}
gen := conf.generator(from)
for _, tt := range tts {
start, stop := gen()
require.Equal(t, tt.start, start)
require.Equal(t, tt.stop, stop)
}
// set weekdays fileter s.t. windows limited to m-f.
conf.Weekdays = []string{
"Monday",
"tue",
"Wed",
"thursday",
"Friday",
}
tts = []struct{ start, stop time.Time }{
// sat {newTime(1, 2), newTime(1, 3)},
// sun {newTime(2, 2), newTime(2, 3)},
{newTime(3, 2), newTime(3, 3)},
{newTime(4, 2), newTime(4, 3)},
{newTime(5, 2), newTime(5, 3)},
{newTime(6, 2), newTime(6, 3)},
{newTime(7, 2), newTime(7, 3)},
// sat {newTime(8, 2), newTime(8, 3)},
// sun {newTime(9, 2), newTime(9, 3)},
}
gen = conf.generator(from)
for _, tt := range tts {
start, stop := gen()
require.Equal(t, tt.start, start)
require.Equal(t, tt.stop, stop)
}
// verify that invalid weekdays are omitted from filter.
conf.Weekdays = []string{
"Monday",
"tues", // invalid
"Wed",
"Th", // invalid
"Friday",
}
tts = []struct{ start, stop time.Time }{
// sat {newTime(1, 2), newTime(1, 3)},
// sun {newTime(2, 2), newTime(2, 3)},
{newTime(3, 2), newTime(3, 3)},
// tue {newTime(4, 2), newTime(4, 3)},
{newTime(5, 2), newTime(5, 3)},
// thu {newTime(6, 2), newTime(6, 3)},
{newTime(7, 2), newTime(7, 3)},
// sat {newTime(8, 2), newTime(8, 3)},
// sun {newTime(9, 2), newTime(9, 3)},
}
gen = conf.generator(from)
for _, tt := range tts {
start, stop := gen()
require.Equal(t, tt.start, start)
require.Equal(t, tt.stop, stop)
}
// if all weekdays are invalid, revert to firing every day
conf.Weekdays = []string{
"Mo",
"Tu",
"We",
"Th",
"Fr",
}
tts = []struct{ start, stop time.Time }{
{newTime(1, 2), newTime(1, 3)},
{newTime(2, 2), newTime(2, 3)},
{newTime(3, 2), newTime(3, 3)},
{newTime(4, 2), newTime(4, 3)},
{newTime(5, 2), newTime(5, 3)},
{newTime(6, 2), newTime(6, 3)},
{newTime(7, 2), newTime(7, 3)},
{newTime(8, 2), newTime(8, 3)},
{newTime(9, 2), newTime(9, 3)},
}
gen = conf.generator(from)
for _, tt := range tts {
start, stop := gen()
require.Equal(t, tt.start, start)
require.Equal(t, tt.stop, stop)
}
}
// verify that the default (empty) maintenance window value is valid.
func TestClusterMaintenanceConfigDefault(t *testing.T) {
t.Parallel()
mw := NewClusterMaintenanceConfig()
require.NoError(t, mw.CheckAndSetDefaults())
}
func TestWeekdayParser(t *testing.T) {
t.Parallel()
tts := []struct {
input string
expect time.Weekday
fail bool
}{
{
input: "Tue",
expect: time.Tuesday,
},
{
input: "tue",
expect: time.Tuesday,
},
{
input: "tues",
fail: true, // only 3-letter shorthand is accepted
},
{
input: "Saturday",
expect: time.Saturday,
},
{
input: "saturday",
expect: time.Saturday,
},
{
input: "sun",
expect: time.Sunday,
},
{
input: "sundae", // containing a valid prefix is insufficient
fail: true,
},
{
input: "",
fail: true,
},
}
for _, tt := range tts {
day, ok := parseWeekday(tt.input)
if tt.fail {
require.False(t, ok)
continue
}
require.Equal(t, tt.expect, day)
}
}

File diff suppressed because it is too large Load diff

View file

@ -96,6 +96,7 @@ import (
"github.com/gravitational/teleport/lib/utils/interval"
vc "github.com/gravitational/teleport/lib/versioncontrol"
"github.com/gravitational/teleport/lib/versioncontrol/github"
uw "github.com/gravitational/teleport/lib/versioncontrol/upgradewindow"
)
const (
@ -487,6 +488,8 @@ type Server struct {
sshca.Authority
upgradeWindowStartHourGetter func(context.Context) (int64, error)
// AuthServiceName is a human-readable name of this CA. If several Auth services are running
// (managing multiple teleport clusters) this field is used to tell them apart in UIs
// It usually defaults to the hostname of the machine the Auth service runs on.
@ -601,6 +604,20 @@ func (a *Server) SetReleaseService(svc release.Client) {
a.releaseService = svc
}
// SetUpgradeWindowStartHourGetter sets the getter used to sync the ClusterMaintenanceConfig resource
// with the cloud UpgradeWindowStartHour value.
func (a *Server) SetUpgradeWindowStartHourGetter(fn func(context.Context) (int64, error)) {
a.lock.Lock()
defer a.lock.Unlock()
a.upgradeWindowStartHourGetter = fn
}
func (a *Server) getUpgradeWindowStartHourGetter() func(context.Context) (int64, error) {
a.lock.Lock()
defer a.lock.Unlock()
return a.upgradeWindowStartHourGetter
}
// SetLoginRuleEvaluator sets the login rule evaluator.
func (a *Server) SetLoginRuleEvaluator(l loginrule.Evaluator) {
a.loginRuleEvaluator = l
@ -643,6 +660,66 @@ func (a *Server) SetHeadlessAuthenticationWatcher(headlessAuthenticationWatcher
a.headlessAuthenticationWatcher = headlessAuthenticationWatcher
}
// syncUpgradeWindowStartHour attempts to load the cloud UpgradeWindowStartHour value and set
// the ClusterMaintenanceConfig resource's AgentUpgrade.UTCStartHour field to match it.
func (a *Server) syncUpgradeWindowStartHour(ctx context.Context) error {
getter := a.getUpgradeWindowStartHourGetter()
if getter == nil {
return trace.Errorf("getter has not been registered")
}
startHour, err := getter(ctx)
if err != nil {
return trace.Wrap(err)
}
cmc, err := a.GetClusterMaintenanceConfig(ctx)
if err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
// create an empty maintenance config resource on NotFound
cmc = types.NewClusterMaintenanceConfig()
}
agentWindow, _ := cmc.GetAgentUpgradeWindow()
agentWindow.UTCStartHour = uint32(startHour)
cmc.SetAgentUpgradeWindow(agentWindow)
if err := a.UpdateClusterMaintenanceConfig(ctx, cmc); err != nil {
return trace.Wrap(err)
}
return nil
}
func (a *Server) periodicSyncUpgradeWindowStartHour() {
checkInterval := interval.New(interval.Config{
Duration: time.Minute * 3,
FirstDuration: utils.FullJitter(time.Second * 30),
Jitter: retryutils.NewSeventhJitter(),
})
defer checkInterval.Stop()
for {
select {
case <-checkInterval.Next():
if err := a.syncUpgradeWindowStartHour(a.closeCtx); err != nil {
if a.closeCtx.Err() == nil {
// we run this periodic at a fairly high frequency, so errors are just
// logged but otherwise ignored.
log.Warnf("Failed to sync upgrade window start hour: %v", err)
}
}
case <-a.closeCtx.Done():
return
}
}
}
// runPeriodicOperations runs some periodic bookkeeping operations
// performed by auth server
func (a *Server) runPeriodicOperations() {
@ -717,6 +794,12 @@ func (a *Server) runPeriodicOperations() {
}
}()
// cloud auth servers need to periodically sync the upgrade window
// from the cloud db.
if modules.GetModules().Features().Cloud {
go a.periodicSyncUpgradeWindowStartHour()
}
for {
select {
case <-a.closeCtx.Done():
@ -4358,6 +4441,87 @@ func (a *Server) SubmitUsageEvent(ctx context.Context, req *proto.SubmitUsageEve
return nil
}
type maintenanceWindowCacheKey struct {
key string
}
// agentWindowLookahead is the number of upgrade windows, starting from 'today', that we export
// when compiling agent upgrade schedules. The choice is arbitrary. We must export at least 2, because upgraders
// treat a schedule value whose windows all end in the past to be stale and therefore a sign that the agent is
// unhealthy. 3 was picked to give us some leeway in terms of how long an agent can be turned off before its
// upgrader starts complaining of a stale schedule.
const agentWindowLookahead = 3
// exportUpgradeWindowsCached generates the export value of all upgrade window schedule types. Since schedules
// are reloaded frequently in large clusters and export incurs string/json encoding, we use the ttl cache to store
// the encoded schedule values for a few seconds.
func (a *Server) exportUpgradeWindowsCached(ctx context.Context) (proto.ExportUpgradeWindowsResponse, error) {
return utils.FnCacheGet(ctx, a.ttlCache, maintenanceWindowCacheKey{"export"}, func(ctx context.Context) (proto.ExportUpgradeWindowsResponse, error) {
var rsp proto.ExportUpgradeWindowsResponse
cmc, err := a.GetClusterMaintenanceConfig(ctx)
if err != nil {
if trace.IsNotFound(err) {
// "not found" is treated as an empty schedule value
return rsp, nil
}
return rsp, trace.Wrap(err)
}
agentWindow, ok := cmc.GetAgentUpgradeWindow()
if !ok {
// "unconfigured" is treated as an empty schedule value
return rsp, nil
}
sched := agentWindow.Export(time.Now(), agentWindowLookahead)
rsp.CanonicalSchedule = &sched
rsp.KubeControllerSchedule, err = uw.EncodeKubeControllerSchedule(sched)
if err != nil {
log.Warnf("Failed to encode kube controller maintenance schedule: %v", err)
}
rsp.SystemdUnitSchedule, err = uw.EncodeSystemdUnitSchedule(sched)
if err != nil {
log.Warnf("Failed to encode systemd unit maintenance schedule: %v", err)
}
return rsp, nil
})
}
func (a *Server) ExportUpgradeWindows(ctx context.Context, req proto.ExportUpgradeWindowsRequest) (proto.ExportUpgradeWindowsResponse, error) {
var rsp proto.ExportUpgradeWindowsResponse
// get the cached collection of all export values
cached, err := a.exportUpgradeWindowsCached(ctx)
if err != nil {
return rsp, nil
}
switch req.UpgraderKind {
case "":
rsp.CanonicalSchedule = cached.CanonicalSchedule.Clone()
case types.UpgraderKindKubeController:
rsp.KubeControllerSchedule = cached.KubeControllerSchedule
if sched := os.Getenv("TELEPORT_UNSTABLE_KUBE_UPGRADE_SCHEDULE"); sched != "" {
rsp.KubeControllerSchedule = sched
}
case types.UpgraderKindSystemdUnit:
rsp.SystemdUnitSchedule = cached.SystemdUnitSchedule
if sched := os.Getenv("TELEPORT_UNSTABLE_SYSTEMD_UPGRADE_SCHEDULE"); sched != "" {
rsp.SystemdUnitSchedule = sched
}
default:
return rsp, trace.NotImplemented("unsupported upgrader kind %q in upgrade window export request", req.UpgraderKind)
}
return rsp, nil
}
func (a *Server) isMFARequired(ctx context.Context, checker services.AccessChecker, req *proto.IsMFARequiredRequest) (*proto.IsMFARequiredResponse, error) {
authPref, err := a.GetAuthPreference(ctx)
if err != nil {

View file

@ -5839,6 +5839,42 @@ func (a *ServerWithRoles) CloneHTTPClient(params ...roundtrip.ClientParam) (*HTT
return nil, trace.NotImplemented("not implemented")
}
// ExportUpgradeWindows is used to load derived upgrade window values for agents that
// need to export schedules to external upgraders.
func (a *ServerWithRoles) ExportUpgradeWindows(ctx context.Context, req proto.ExportUpgradeWindowsRequest) (proto.ExportUpgradeWindowsResponse, error) {
// Ensure that caller is a teleport server
role, ok := a.context.Identity.(authz.BuiltinRole)
if !ok || !role.IsServer() {
return proto.ExportUpgradeWindowsResponse{}, trace.AccessDenied("agent maintenance schedule is only accessible to teleport built-in servers")
}
return a.authServer.ExportUpgradeWindows(ctx, req)
}
// GetClusterMaintenanceConfig gets the current maintenance config singleton.
func (a *ServerWithRoles) GetClusterMaintenanceConfig(ctx context.Context) (types.ClusterMaintenanceConfig, error) {
if err := a.action(apidefaults.Namespace, types.KindClusterMaintenanceConfig, types.VerbRead); err != nil {
return nil, trace.Wrap(err)
}
return a.authServer.GetClusterMaintenanceConfig(ctx)
}
// UpdateClusterMaintenanceConfig updates the current maintenance config singleton.
func (a *ServerWithRoles) UpdateClusterMaintenanceConfig(ctx context.Context, cmc types.ClusterMaintenanceConfig) error {
if err := a.action(apidefaults.Namespace, types.KindClusterMaintenanceConfig, types.VerbCreate, types.VerbUpdate); err != nil {
return trace.Wrap(err)
}
if modules.GetModules().Features().Cloud {
// maintenance configuration in cloud is derived from values stored in
// an external cloud-specific databse.
return trace.NotImplemented("cloud clusters not support custom cluster maintenance resources")
}
return a.authServer.UpdateClusterMaintenanceConfig(ctx, cmc)
}
// NewAdminAuthServer returns auth server authorized as admin,
// used for auth server cached access
func NewAdminAuthServer(authServer *Server, alog events.AuditLogSessionStreamer) (ClientI, error) {

View file

@ -4899,6 +4899,56 @@ func (g *GRPCServer) GetHeadlessAuthentication(ctx context.Context, req *proto.G
return authReq, trace.Wrap(err)
}
// ExportUpgradeWindows is used to load derived upgrade window values for agents that
// need to export schedules to external upgraders.
func (g *GRPCServer) ExportUpgradeWindows(ctx context.Context, req *proto.ExportUpgradeWindowsRequest) (*proto.ExportUpgradeWindowsResponse, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
rsp, err := auth.ExportUpgradeWindows(ctx, *req)
if err != nil {
return nil, trace.Wrap(err)
}
return &rsp, nil
}
// GetClusterMaintenanceConfig gets the current maintenance config singleton.
func (g *GRPCServer) GetClusterMaintenanceConfig(ctx context.Context, _ *emptypb.Empty) (*types.ClusterMaintenanceConfigV1, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
cmc, err := auth.GetClusterMaintenanceConfig(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
rsp, ok := cmc.(*types.ClusterMaintenanceConfigV1)
if !ok {
return nil, trace.BadParameter("unexpected maintenance config type %T", cmc)
}
return rsp, nil
}
// UpdateClusterMaintenanceConfig updates the current maintenance config singleton.
func (g *GRPCServer) UpdateClusterMaintenanceConfig(ctx context.Context, cmc *types.ClusterMaintenanceConfigV1) (*emptypb.Empty, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
if err := auth.UpdateClusterMaintenanceConfig(ctx, cmc); err != nil {
return nil, trace.Wrap(err)
}
return &emptypb.Empty{}, nil
}
// GetBackend returns the backend from the underlying auth server.
func (g *GRPCServer) GetBackend() backend.Backend {
return g.AuthServer.bk

View file

@ -18,7 +18,10 @@ package auth
import (
"context"
"fmt"
"math"
"strings"
"sync"
"testing"
"time"
@ -1090,6 +1093,145 @@ func resourceDiff(res1, res2 types.Resource) string {
cmpopts.EquateEmpty())
}
// TestSyncUpgadeWindowStartHour verifies the core logic of the upgrade window start
// hour behavior.
func TestSyncUpgradeWindowStartHour(t *testing.T) {
ctx := context.Background()
conf := setupConfig(t)
authServer, err := Init(conf)
require.NoError(t, err)
t.Cleanup(func() { authServer.Close() })
// no getter is registered, sync should fail
require.Error(t, authServer.syncUpgradeWindowStartHour(ctx))
// maintenance config does not exist yet
cmc, err := authServer.GetClusterMaintenanceConfig(ctx)
require.Error(t, err)
require.True(t, trace.IsNotFound(err))
require.Nil(t, cmc)
// set up fake getter
var mu sync.Mutex
var fakeHour int64
var fakeError error
authServer.SetUpgradeWindowStartHourGetter(func(ctx context.Context) (int64, error) {
mu.Lock()
defer mu.Unlock()
return fakeHour, fakeError
})
// sync should now succeed
require.NoError(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok := cmc.GetAgentUpgradeWindow()
require.True(t, ok)
require.Equal(t, uint32(0), agentWindow.UTCStartHour)
// change the served hour
mu.Lock()
fakeHour = 16
mu.Unlock()
require.NoError(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok = cmc.GetAgentUpgradeWindow()
require.True(t, ok)
require.Equal(t, uint32(16), agentWindow.UTCStartHour)
// set sync to fail with out of range hour
mu.Lock()
fakeHour = 36
mu.Unlock()
require.Error(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok = cmc.GetAgentUpgradeWindow()
require.True(t, ok)
// verify that the old hour value persists since the sync failed
require.Equal(t, uint32(16), agentWindow.UTCStartHour)
// set sync to fail with impossible int type-cast
mu.Lock()
fakeHour = math.MaxInt64
mu.Unlock()
require.Error(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok = cmc.GetAgentUpgradeWindow()
require.True(t, ok)
// verify that the old hour value persists since the sync failed
require.Equal(t, uint32(16), agentWindow.UTCStartHour)
mu.Lock()
fakeHour = 18
mu.Unlock()
// sync should now succeed again
require.NoError(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok = cmc.GetAgentUpgradeWindow()
require.True(t, ok)
// verify that we got the new hour value
require.Equal(t, uint32(18), agentWindow.UTCStartHour)
// set sync to fail with error
mu.Lock()
fakeHour = 12
fakeError = fmt.Errorf("uh-oh")
mu.Unlock()
require.Error(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok = cmc.GetAgentUpgradeWindow()
require.True(t, ok)
// verify that the old hour value persists since the sync failed
require.Equal(t, uint32(18), agentWindow.UTCStartHour)
// recover and set hour to zero
mu.Lock()
fakeHour = 0
fakeError = nil
mu.Unlock()
// sync should now succeed again
require.NoError(t, authServer.syncUpgradeWindowStartHour(ctx))
cmc, err = authServer.GetClusterMaintenanceConfig(ctx)
require.NoError(t, err)
agentWindow, ok = cmc.GetAgentUpgradeWindow()
require.True(t, ok)
// verify that we got the new hour value
require.Equal(t, uint32(0), agentWindow.UTCStartHour)
}
// TestIdentityChecker verifies auth identity properly validates host
// certificates when connecting to an SSH server.
func TestIdentityChecker(t *testing.T) {

View file

@ -24,6 +24,7 @@ import (
"sync"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -35,7 +36,10 @@ import (
)
const (
// secretIdentifierName is the suffix used to construct the per-agent store.
secretIdentifierName = "state"
// sharedSecretIdentifierName is the suffix used to construct the shared store.
sharedSecretIdentifierName = "shared-state"
// NamespaceEnv is the env variable defined by the Helm chart that contains the
// namespace value.
NamespaceEnv = "KUBE_NAMESPACE"
@ -61,12 +65,16 @@ type Config struct {
// Namespace is the Agent's namespace
// Field is required
Namespace string
// SecretName is unique secret per agent where state and identity will be stored.
// SecretName is the name of the kubernetes secret resource that backs this store. Conventionally
// this will be set to '<replica-name>-state' for per-agent secret store, and '<release-name>-shared-state'
// for the shared release-level store.
// Field is required
SecretName string
// ReplicaName is the Agent's pod name
// Field is required
ReplicaName string
// FieldManager is the name used to identify the "owner" of fields within
// the store. This is the replica name in the per-agent state store, and
// helm release name (or 'teleport') in the shared store.
// Field is required.
FieldManager string
// ReleaseName is the HELM release name
// Field is optional
ReleaseName string
@ -84,8 +92,8 @@ func (c Config) Check() error {
return trace.BadParameter("missing secret name")
}
if len(c.ReplicaName) == 0 {
return trace.BadParameter("missing replica name")
if len(c.FieldManager) == 0 {
return trace.BadParameter("missing field manager")
}
if c.KubeClient == nil {
@ -95,7 +103,8 @@ func (c Config) Check() error {
return nil
}
// Backend uses Kubernetes Secrets to store identities.
// Backend implements a subset of the teleport backend API backed by a kuberentes secret resource
// and storing backend items as entries in the secret's 'data' map.
type Backend struct {
Config
@ -131,9 +140,50 @@ func NewWithClient(restClient kubernetes.Interface) (*Backend, error) {
os.Getenv(teleportReplicaNameEnv),
secretIdentifierName,
),
ReplicaName: os.Getenv(teleportReplicaNameEnv),
ReleaseName: os.Getenv(ReleaseNameEnv),
KubeClient: restClient,
FieldManager: os.Getenv(teleportReplicaNameEnv),
ReleaseName: os.Getenv(ReleaseNameEnv),
KubeClient: restClient,
},
)
}
// NewShared returns a new instance of the kuberentes shared secret store (equivalent to New() except that
// this backend can be written to by any teleport agent within the helm release. used for propagating relevant state
// to controllers).
func NewShared() (*Backend, error) {
restClient, _, err := kubeutils.GetKubeClient("")
if err != nil {
return nil, trace.Wrap(err)
}
return NewSharedWithClient(restClient)
}
// NewSharedWithClient returns a new instance of the shared kuberenetes secret store with the provided client (equivalent
// to NewWithClient() except that this backend can be written to by any teleport agent within the helm release. used for propagating
// relevant state to controllers).
func NewSharedWithClient(restClient kubernetes.Interface) (*Backend, error) {
if os.Getenv(NamespaceEnv) == "" {
return nil, trace.BadParameter("environment variable %q not set or empty", NamespaceEnv)
}
ident := os.Getenv(ReleaseNameEnv)
if ident == "" {
ident = "teleport"
log.Warnf("Var %q is not set, falling back to default identifier %q for shared store.", ReleaseNameEnv, ident)
}
return NewWithConfig(
Config{
Namespace: os.Getenv(NamespaceEnv),
SecretName: fmt.Sprintf(
"%s-%s",
ident,
sharedSecretIdentifierName,
),
FieldManager: ident,
ReleaseName: os.Getenv(ReleaseNameEnv),
KubeClient: restClient,
},
)
}
@ -272,7 +322,7 @@ func (b *Backend) upsertSecret(ctx context.Context, secret *corev1.Secret) error
_, err := b.KubeClient.
CoreV1().
Secrets(b.Namespace).
Apply(ctx, secretApply, metav1.ApplyOptions{FieldManager: b.ReplicaName})
Apply(ctx, secretApply, metav1.ApplyOptions{FieldManager: b.FieldManager})
return trace.Wrap(err)
}

View file

@ -88,4 +88,9 @@ type ClusterConfiguration interface {
DeleteInstaller(ctx context.Context, name string) error
// DeleteAllInstallers removes all installer script resources from the backend
DeleteAllInstallers(context.Context) error
// GetClusterMaintenanceConfig loads the current maintenance config singleton.
GetClusterMaintenanceConfig(ctx context.Context) (types.ClusterMaintenanceConfig, error)
// UpdateClusterMaintenanceConfig updates the maintenance config singleton.
UpdateClusterMaintenanceConfig(ctx context.Context, cfg types.ClusterMaintenanceConfig) error
}

View file

@ -18,6 +18,7 @@ package local
import (
"context"
"errors"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
@ -28,6 +29,8 @@ import (
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/observability/metrics"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local/generic"
"github.com/gravitational/teleport/lib/utils"
)
var clusterNameNotFound = prometheus.NewCounter(
@ -444,6 +447,44 @@ func (s *ClusterConfigurationService) DeleteAllInstallers(ctx context.Context) e
return nil
}
// GetClusterMaintenanceConfig loads the maintenance config singleton resource.
func (s *ClusterConfigurationService) GetClusterMaintenanceConfig(ctx context.Context) (types.ClusterMaintenanceConfig, error) {
item, err := s.Get(ctx, backend.Key(clusterConfigPrefix, maintenancePrefix))
if err != nil {
if trace.IsNotFound(err) {
return nil, trace.NotFound("no maintenance config has been created")
}
return nil, trace.Wrap(err)
}
var cmc types.ClusterMaintenanceConfigV1
if err := utils.FastUnmarshal(item.Value, &cmc); err != nil {
return nil, trace.Wrap(err)
}
return &cmc, nil
}
// UpdateClusterMaintenanceConfig performs a nonce-protected update of the maintenance config singleton resource.
func (s *ClusterConfigurationService) UpdateClusterMaintenanceConfig(ctx context.Context, cmc types.ClusterMaintenanceConfig) error {
if err := cmc.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
err := generic.FastUpdateNonceProtectedResource(
ctx,
s.Backend,
backend.Key(clusterConfigPrefix, maintenancePrefix),
cmc,
)
if errors.Is(err, generic.ErrNonceViolation) {
return trace.CompareFailed("maintenance config was concurrently modified, please re-pull and work from latest state")
}
return trace.Wrap(err)
}
const (
clusterConfigPrefix = "cluster_configuration"
namePrefix = "name"
@ -457,4 +498,5 @@ const (
scriptsPrefix = "scripts"
uiPrefix = "ui"
installerPrefix = "installer"
maintenancePrefix = "maintenance"
)

View file

@ -0,0 +1,167 @@
/*
Copyright 2023 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"errors"
"math"
"time"
"github.com/gravitational/trace"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/utils"
)
// nonceProtectedResourceShim is a helper for quickly extracting the nonce
type nonceProtectedResourceShim struct {
Nonce uint64 `json:"nonce"`
}
// ErrNonceViolation is the error returned by FastUpdateNonceProtectedResource when a nonce-protected
// update fails due to concurrent modification. This error should be caught and re-mapped into an
// appropriate user-facing message for the given resource type.
var ErrNonceViolation = errors.New("nonce-violation")
// nonceProtectedResource describes the expected methods for a resource that is protected
// from concurrent modification by a nonce.
type nonceProtectedResource interface {
Expiry() time.Time
GetNonce() uint64
WithNonce(uint64) any
}
// FastUpdateNonceProtectedResource is a helper for updating a resource that is protected by a nonce. The target resource must store
// its nonce value in a top-level 'nonce' field in order for correct nonce semantics to be observed.
func FastUpdateNonceProtectedResource[T nonceProtectedResource](ctx context.Context, bk backend.Backend, key []byte, resource T) error {
if resource.GetNonce() == math.MaxUint64 {
return fastUpsertNonceProtectedResource(ctx, bk, key, resource)
}
val, err := utils.FastMarshal(resource.WithNonce(resource.GetNonce() + 1))
if err != nil {
return trace.Errorf("failed to marshal resource at %q: %v", key, err)
}
item := backend.Item{
Key: key,
Value: val,
Expires: resource.Expiry(),
}
if resource.GetNonce() == 0 {
_, err := bk.Create(ctx, item)
if err != nil {
if trace.IsAlreadyExists(err) {
return ErrNonceViolation
}
return trace.Wrap(err)
}
return nil
}
prev, err := bk.Get(ctx, item.Key)
if err != nil {
if trace.IsNotFound(err) {
return ErrNonceViolation
}
return trace.Wrap(err)
}
var shim nonceProtectedResourceShim
if err := utils.FastUnmarshal(prev.Value, &shim); err != nil {
return trace.Errorf("failed to read nonce of resource at %q", item.Key)
}
if shim.Nonce != resource.GetNonce() {
return ErrNonceViolation
}
_, err = bk.CompareAndSwap(ctx, *prev, item)
if err != nil {
if trace.IsCompareFailed(err) {
return ErrNonceViolation
}
return trace.Wrap(err)
}
return nil
}
// fastUpsertNonceProtectedResource performs an "upsert" while preserving correct nonce ordering. necessary in order to prevent upserts
// from breaking concurrent protected updates.
func fastUpsertNonceProtectedResource[T nonceProtectedResource](ctx context.Context, bk backend.Backend, key []byte, resource T) error {
const maxRetries = 16
for i := 0; i < maxRetries; i++ {
prev, err := bk.Get(ctx, key)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
var prevNonce uint64
if prev != nil {
var shim nonceProtectedResourceShim
if err := utils.FastUnmarshal(prev.Value, &shim); err != nil {
return trace.Wrap(err)
}
prevNonce = shim.Nonce
}
nextNonce := prevNonce + 1
if nextNonce == 0 {
nextNonce = 1
}
val, err := utils.FastMarshal(resource.WithNonce(nextNonce))
if err != nil {
return trace.Errorf("failed to marshal resource at %q: %v", key, err)
}
item := backend.Item{
Key: key,
Value: val,
Expires: resource.Expiry(),
}
if prev == nil {
_, err := bk.Create(ctx, item)
if err != nil {
if trace.IsAlreadyExists(err) {
continue
}
return trace.Wrap(err)
}
return nil
}
_, err = bk.CompareAndSwap(ctx, *prev, item)
if err != nil {
if trace.IsCompareFailed(err) {
continue
}
return trace.Wrap(err)
}
return nil
}
return trace.LimitExceeded("failed to update resource at %q, too many concurrent updates", key)
}

View file

@ -0,0 +1,230 @@
/*
Copyright 2023 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"errors"
"math"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/gravitational/trace"
"github.com/stretchr/testify/require"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/utils"
)
type noncedResource struct {
types.ResourceHeader
Nonce uint64 `json:"nonce"`
}
func (r *noncedResource) GetNonce() uint64 {
return r.Nonce
}
func (r *noncedResource) WithNonce(nonce uint64) any {
c := *r
c.Nonce = nonce
return &c
}
func newNoncedResource(name string, nonce uint64) *noncedResource {
return &noncedResource{
ResourceHeader: types.ResourceHeader{
Metadata: types.Metadata{
Name: name,
},
},
Nonce: nonce,
}
}
func fastGetResource[T types.Resource](ctx context.Context, bk backend.Backend, key []byte) (T, error) {
var value T
item, err := bk.Get(ctx, key)
if err != nil {
return value, trace.Wrap(err)
}
if err := utils.FastUnmarshal(item.Value, &value); err != nil {
return value, trace.Errorf("failed to unmarshal resource at %q: %v", key, err)
}
if item.Expires.IsZero() {
value.SetExpiry(time.Time{})
} else {
value.SetExpiry(item.Expires.UTC())
}
return value, nil
}
// TestNonceBasics verifies basic nonce behaviors.
func TestNonceBasics(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bk, err := memory.New(memory.Config{
Context: ctx,
})
require.NoError(t, err)
// nonce of 1 is an "update", but resource does not exist yet
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 1))
require.ErrorIs(t, err, ErrNonceViolation)
// nonce of 0 is a valid "create".
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 0))
require.NoError(t, err)
// subsequent calls with nonce of 0 fail.
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 0))
require.ErrorIs(t, err, ErrNonceViolation)
// nonce of 1 is now a valid update
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 1))
require.NoError(t, err)
// loading and then re-inserting should always work since nonce is incremented internally
for i := 0; i < 10; i++ {
rsc, err := fastGetResource[*noncedResource](ctx, bk, []byte("k1"))
require.NoError(t, err)
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), rsc)
require.NoError(t, err)
}
// sanity check: nonce incremented expected number of times
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 12))
require.NoError(t, err)
// max uint64 "forces" update
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", math.MaxUint64))
require.NoError(t, err)
// forced update correctly conflicts with what would normally be the "next" valid nonce.
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 13))
require.ErrorIs(t, err, ErrNonceViolation)
// forced update correctly incremented nonce by 1
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k1"), newNoncedResource("r1", 14))
require.NoError(t, err)
// max uint64 "forces" update for nonexistent resources too
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k2"), newNoncedResource("r2", math.MaxUint64))
require.NoError(t, err)
// forced update correctly sets new nonce to 1
err = FastUpdateNonceProtectedResource(ctx, bk, []byte("k2"), newNoncedResource("r2", 1))
require.NoError(t, err)
}
// TestNonceParallelism verifies expected nonce behavior under high contention.
func TestNonceParallelism(t *testing.T) {
// note: in theory a higher number of goroutines with a lower number of updates per goroutine
// would be a better test case. unfortunately, that configuration seems to cause some serious perf degredation
// on resource-starved test machines. possibly because the mutex goes into starvation mode,
// which makes it "round robin" across its waiters, which is sub-optimal for operations like
// compare-and-swap which need to acquire the backend mutex multiple times in quick succession (this
// is just a guess based on examining tracebacks).
const routines = 4
const updates = 512
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bk, err := memory.New(memory.Config{
Context: ctx,
})
require.NoError(t, err)
errch := make(chan error, 1)
fail := func(err error) {
select {
case errch <- err:
default:
}
}
var wg sync.WaitGroup
var violations atomic.Uint64
key := "key"
name := "rsc"
for r := 0; r < routines; r++ {
wg.Add(1)
go func(r int) {
defer wg.Done()
rem := updates
for rem > 0 {
rsc, err := fastGetResource[*noncedResource](ctx, bk, []byte(key))
if err != nil && !trace.IsNotFound(err) {
fail(err)
return
}
if rsc == nil {
// resource does not exist yet, start from 0
rsc = newNoncedResource(name, 0)
}
err = FastUpdateNonceProtectedResource(ctx, bk, []byte(key), rsc)
if err != nil {
if errors.Is(err, ErrNonceViolation) {
violations.Add(1)
// concurrently modified, try again
continue
}
fail(err)
return
}
rem--
}
}(r)
}
wg.Wait()
// verify that none of the writer goroutines hit an unexpected error
close(errch)
require.NoError(t, <-errch)
// load resource and verify that we hit our exact expected number of updates
rsc, err := fastGetResource[*noncedResource](ctx, bk, []byte(key))
require.NoError(t, err)
require.Equal(t, routines*updates, int(rsc.Nonce))
// sanity-check: test *must* have hit some nonce violations
require.Greater(t, int(violations.Load()), 0)
}

View file

@ -193,6 +193,8 @@ func ParseShortcut(in string) (string, error) {
return types.KindOktaImportRule, nil
case types.KindOktaAssignment, types.KindOktaAssignment + "s", "oktaassignment", "oktaassignments":
return types.KindOktaAssignment, nil
case types.KindClusterMaintenanceConfig, "cmc":
return types.KindClusterMaintenanceConfig, nil
}
return "", trace.BadParameter("unsupported resource: %q - resources should be expressed as 'type/name', for example 'connector/github'", in)
}

View file

@ -0,0 +1,59 @@
/*
Copyright 2023 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgradewindow
import (
"fmt"
"strings"
"github.com/gravitational/trace"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/utils"
)
// EncodeKubeControllerSchedule converts an agent upgrade schedule to the file format
// expected by the kuberenets upgrade controller.
func EncodeKubeControllerSchedule(schedule types.AgentUpgradeSchedule) (string, error) {
b, err := utils.FastMarshal(&schedule)
if err != nil {
return "", trace.Errorf("failed to encode kube controller schedule: %v", err)
}
return string(b), nil
}
// unitScheduleHeader is the first line in the systemd unit upgrader schedule. The teleport-upgrade
// script invoked by the unit ignores all lines starting with '# '.
const unitScheduleHeader = "# generated by teleport\n"
// EncodeSystemdUnitSchedule converts an agent upgrade schedule to the file format
// expected by the teleport-upgrade script.
func EncodeSystemdUnitSchedule(schedule types.AgentUpgradeSchedule) (string, error) {
if len(schedule.Windows) == 0 {
return "", trace.BadParameter("cannot encode empty schedule")
}
var builder strings.Builder
builder.WriteString(unitScheduleHeader)
for _, window := range schedule.Windows {
// upgrade windows are encoded as a pair of space-separated unix timestamps.
fmt.Fprintf(&builder, "%d %d\n", window.Start.Unix(), window.Stop.Unix())
}
return builder.String(), nil
}

View file

@ -0,0 +1,173 @@
/*
Copyright 2023 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgradewindow
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/utils"
)
type kubeScheduleRepr struct {
Windows []windowRepr `json:"windows"`
}
type windowRepr struct {
Start time.Time `json:"start"`
Stop time.Time `json:"stop"`
}
// windowRef is a known-good reference value used in checking consistent agent window encoding.
const windowRef = `{"windows":[{"start":"2023-03-28T02:00:00Z","stop":"2023-03-28T03:00:00Z"},{"start":"2023-03-29T02:00:00Z","stop":"2023-03-29T03:00:00Z"},{"start":"2023-03-30T02:00:00Z","stop":"2023-03-30T03:00:00Z"}]}`
// TestKubeControllerScheduleBasics performs a basic check comparing the agent upgrade schedule and our locally
// defined expected format in terms of a known good schedule value.
func TestKubeControllerScheduleBasics(t *testing.T) {
t.Parallel()
var sched types.AgentUpgradeSchedule
err := utils.FastUnmarshal([]byte(windowRef), &sched)
require.NoError(t, err)
require.Len(t, sched.Windows, 3)
var repr kubeScheduleRepr
err = utils.FastUnmarshal([]byte(windowRef), &repr)
require.NoError(t, err)
require.Len(t, repr.Windows, 3)
// ensure that times are equivalent
for i := 0; i < 3; i++ {
require.False(t, repr.Windows[i].Start.IsZero())
require.False(t, repr.Windows[i].Stop.IsZero())
require.Equal(t, repr.Windows[i].Start, sched.Windows[i].Start)
require.Equal(t, repr.Windows[i].Stop, sched.Windows[i].Stop)
}
// encode the schedule value
s, err := EncodeKubeControllerSchedule(sched)
require.NoError(t, err)
// decode as expected repr
var repr2 kubeScheduleRepr
err = utils.FastUnmarshal([]byte(s), &repr2)
require.NoError(t, err)
// ensure equality holds
require.Equal(t, repr, repr2)
}
// FuzzKubeControllerSchedule does some fuzzy verification of schedule encoding/decoding
// to ensure that EncodeKubeControllerSchedule maintains compatibility with expected json repr.
func FuzzKubeControllerSchedule(f *testing.F) {
f.Fuzz(func(t *testing.T, start, stop uint32) {
startTime := time.Unix(int64(start), 0)
stopTime := time.Unix(int64(stop), 0)
sched := types.AgentUpgradeSchedule{
Windows: []types.ScheduledAgentUpgradeWindow{
{
Start: startTime,
Stop: stopTime,
},
},
}
expect := kubeScheduleRepr{
Windows: []windowRepr{
{
Start: startTime,
Stop: stopTime,
},
},
}
s, err := EncodeKubeControllerSchedule(sched)
require.NoError(t, err)
var repr kubeScheduleRepr
err = utils.FastUnmarshal([]byte(s), &repr)
require.NoError(t, err)
require.Equal(t, expect, repr)
})
}
// TestSystemdUnitSchedule verifies basic behavior of systemd unit schedule encoding.
func TestSystemdUnitSchedule(t *testing.T) {
t.Parallel()
tts := []struct {
desc string
schedule types.AgentUpgradeSchedule
expect string
err bool
}{
{
desc: "empty window case",
err: true,
},
{
desc: "basic single-window case",
schedule: types.AgentUpgradeSchedule{
Windows: []types.ScheduledAgentUpgradeWindow{
{
Start: time.Unix(100, 0),
Stop: time.Unix(200, 0),
},
},
},
expect: "# generated by teleport\n100 200\n",
},
{
desc: "normal multi-window case",
schedule: types.AgentUpgradeSchedule{
Windows: []types.ScheduledAgentUpgradeWindow{
{
Start: time.Unix(500000, 0),
Stop: time.Unix(600000, 0),
},
{
Start: time.Unix(700000, 0),
Stop: time.Unix(800000, 0),
},
{
Start: time.Unix(900000, 0),
Stop: time.Unix(1000000, 0),
},
},
},
expect: "# generated by teleport\n500000 600000\n700000 800000\n900000 1000000\n",
},
}
for _, tt := range tts {
s, err := EncodeSystemdUnitSchedule(tt.schedule)
if tt.err {
require.Error(t, err, "desc=%q", tt.desc)
continue
}
require.NoError(t, err, "desc=%q", tt.desc)
require.Equal(t, tt.expect, s, "desc=%q", tt.desc)
}
}

View file

@ -590,6 +590,37 @@ func (c *netConfigCollection) writeText(w io.Writer) error {
return trace.Wrap(err)
}
type maintenanceWindowCollection struct {
cmc types.ClusterMaintenanceConfig
}
func (c *maintenanceWindowCollection) resources() (r []types.Resource) {
if c.cmc == nil {
return nil
}
return []types.Resource{c.cmc}
}
func (c *maintenanceWindowCollection) writeText(w io.Writer) error {
t := asciitable.MakeTable([]string{"Type", "Params"})
agentUpgradeParams := "none"
if c.cmc != nil {
if win, ok := c.cmc.GetAgentUpgradeWindow(); ok {
agentUpgradeParams = fmt.Sprintf("utc_start_hour=%d", win.UTCStartHour)
if len(win.Weekdays) != 0 {
agentUpgradeParams = fmt.Sprintf("%s, weekdays=%s", agentUpgradeParams, strings.Join(win.Weekdays, ","))
}
}
}
t.AddRow([]string{"Agent Upgrades", agentUpgradeParams})
_, err := t.AsBuffer().WriteTo(w)
return trace.Wrap(err)
}
type recConfigCollection struct {
recConfig types.SessionRecordingConfig
}

View file

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"sort"
"time"
@ -100,29 +101,30 @@ Same as above, but using JSON output:
// Initialize allows ResourceCommand to plug itself into the CLI parser
func (rc *ResourceCommand) Initialize(app *kingpin.Application, config *servicecfg.Config) {
rc.CreateHandlers = map[ResourceKind]ResourceCreateHandler{
types.KindUser: rc.createUser,
types.KindRole: rc.createRole,
types.KindTrustedCluster: rc.createTrustedCluster,
types.KindGithubConnector: rc.createGithubConnector,
types.KindCertAuthority: rc.createCertAuthority,
types.KindClusterAuthPreference: rc.createAuthPreference,
types.KindClusterNetworkingConfig: rc.createClusterNetworkingConfig,
types.KindSessionRecordingConfig: rc.createSessionRecordingConfig,
types.KindUIConfig: rc.createUIConfig,
types.KindLock: rc.createLock,
types.KindNetworkRestrictions: rc.createNetworkRestrictions,
types.KindApp: rc.createApp,
types.KindDatabase: rc.createDatabase,
types.KindKubernetesCluster: rc.createKubeCluster,
types.KindToken: rc.createToken,
types.KindInstaller: rc.createInstaller,
types.KindNode: rc.createNode,
types.KindOIDCConnector: rc.createOIDCConnector,
types.KindSAMLConnector: rc.createSAMLConnector,
types.KindLoginRule: rc.createLoginRule,
types.KindSAMLIdPServiceProvider: rc.createSAMLIdPServiceProvider,
types.KindDevice: rc.createDevice,
types.KindOktaImportRule: rc.createOktaImportRule,
types.KindUser: rc.createUser,
types.KindRole: rc.createRole,
types.KindTrustedCluster: rc.createTrustedCluster,
types.KindGithubConnector: rc.createGithubConnector,
types.KindCertAuthority: rc.createCertAuthority,
types.KindClusterAuthPreference: rc.createAuthPreference,
types.KindClusterNetworkingConfig: rc.createClusterNetworkingConfig,
types.KindClusterMaintenanceConfig: rc.createClusterMaintenanceConfig,
types.KindSessionRecordingConfig: rc.createSessionRecordingConfig,
types.KindUIConfig: rc.createUIConfig,
types.KindLock: rc.createLock,
types.KindNetworkRestrictions: rc.createNetworkRestrictions,
types.KindApp: rc.createApp,
types.KindDatabase: rc.createDatabase,
types.KindKubernetesCluster: rc.createKubeCluster,
types.KindToken: rc.createToken,
types.KindInstaller: rc.createInstaller,
types.KindNode: rc.createNode,
types.KindOIDCConnector: rc.createOIDCConnector,
types.KindSAMLConnector: rc.createSAMLConnector,
types.KindLoginRule: rc.createLoginRule,
types.KindSAMLIdPServiceProvider: rc.createSAMLIdPServiceProvider,
types.KindDevice: rc.createDevice,
types.KindOktaImportRule: rc.createOktaImportRule,
}
rc.config = config
@ -491,6 +493,29 @@ func (rc *ResourceCommand) createClusterNetworkingConfig(ctx context.Context, cl
return nil
}
func (rc *ResourceCommand) createClusterMaintenanceConfig(ctx context.Context, client auth.ClientI, raw services.UnknownResource) error {
var cmc types.ClusterMaintenanceConfigV1
if err := utils.FastUnmarshal(raw.Raw, &cmc); err != nil {
return trace.Wrap(err)
}
if err := cmc.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
if rc.force {
// max nonce forces "upsert" behavior
cmc.Nonce = math.MaxUint64
}
if err := client.UpdateClusterMaintenanceConfig(ctx, &cmc); err != nil {
return trace.Wrap(err)
}
fmt.Println("maintenance window has been updated")
return nil
}
// createSessionRecordingConfig implements `tctl create recconfig.yaml` command.
func (rc *ResourceCommand) createSessionRecordingConfig(ctx context.Context, client auth.ClientI, raw services.UnknownResource) error {
newRecConfig, err := services.UnmarshalSessionRecordingConfig(raw.Raw)
@ -1459,6 +1484,17 @@ func (rc *ResourceCommand) getCollection(ctx context.Context, client auth.Client
return nil, trace.Wrap(err)
}
return &netConfigCollection{netConfig}, nil
case types.KindClusterMaintenanceConfig:
if rc.ref.Name != "" {
return nil, trace.BadParameter("only simple `tctl get %v` can be used", types.KindClusterMaintenanceConfig)
}
cmc, err := client.GetClusterMaintenanceConfig(ctx)
if err != nil && !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
return &maintenanceWindowCollection{cmc}, nil
case types.KindSessionRecordingConfig:
if rc.ref.Name != "" {
return nil, trace.BadParameter("only simple `tctl get %v` can be used", types.KindSessionRecordingConfig)