1
0
mirror of https://github.com/minio/minio synced 2024-07-05 17:08:43 +00:00

fix: Prometheus metrics to re-use storage disks (#11647)

also re-use storage disks for all `mc admin server info`
calls as well, implement a new LocalStorageInfo() API
call at ObjectLayer to lookup local disks storageInfo

also fixes bugs where there were double calls to StorageInfo()
This commit is contained in:
Harshavardhana 2021-03-02 17:28:04 -08:00 committed by GitHub
parent cd9e30c0f4
commit c6a120df0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 169 additions and 66 deletions

View File

@ -1551,13 +1551,13 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
if globalLDAPConfig.Enabled {
ldapConn, err := globalLDAPConfig.Connect()
if err != nil {
ldap.Status = "offline"
ldap.Status = string(madmin.ItemOffline)
} else if ldapConn == nil {
ldap.Status = "Not Configured"
} else {
// Close ldap connection to avoid leaks.
ldapConn.Close()
ldap.Status = "online"
ldap.Status = string(madmin.ItemOnline)
}
}
@ -1573,7 +1573,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
assignPoolNumbers(servers)
var backend interface{}
mode := madmin.ObjectLayerInitializing
mode := madmin.ItemInitializing
buckets := madmin.Buckets{}
objects := madmin.Objects{}
@ -1581,7 +1581,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
objectAPI := newObjectLayerFn()
if objectAPI != nil {
mode = madmin.ObjectLayerOnline
mode = madmin.ItemOnline
// Load data usage
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
if err == nil {
@ -1628,7 +1628,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
}
infoMsg := madmin.InfoMessage{
Mode: mode,
Mode: string(mode),
Domain: domain,
Region: globalServerRegion,
SQSARN: globalNotificationSys.GetARNList(false),
@ -1678,9 +1678,9 @@ func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus {
active, _ := tgt.IsActive()
targetID := tgt.ID()
if active {
targetIDStatus[targetID.ID] = madmin.Status{Status: "Online"}
targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOnline)}
} else {
targetIDStatus[targetID.ID] = madmin.Status{Status: "Offline"}
targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOffline)}
}
list := lambdaMap[targetID.Name]
list = append(list, targetIDStatus)
@ -1692,9 +1692,9 @@ func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus {
active, _ := tgt.IsActive()
targetID := tgt.ID()
if active {
targetIDStatus[targetID.ID] = madmin.Status{Status: "Online"}
targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOnline)}
} else {
targetIDStatus[targetID.ID] = madmin.Status{Status: "Offline"}
targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOffline)}
}
list := lambdaMap[targetID.Name]
list = append(list, targetIDStatus)
@ -1727,9 +1727,9 @@ func fetchKMSStatus() madmin.KMS {
}
if err := checkConnection(kmsInfo.Endpoints[0], 15*time.Second); err != nil {
kmsStat.Status = "offline"
kmsStat.Status = string(madmin.ItemOffline)
} else {
kmsStat.Status = "online"
kmsStat.Status = string(madmin.ItemOnline)
kmsContext := crypto.Context{"MinIO admin API": "ServerInfoHandler"} // Context for a test key operation
// 1. Generate a new key using the KMS.
@ -1737,7 +1737,7 @@ func fetchKMSStatus() madmin.KMS {
if err != nil {
kmsStat.Encrypt = fmt.Sprintf("Encryption failed: %v", err)
} else {
kmsStat.Encrypt = "Ok"
kmsStat.Encrypt = "success"
}
// 2. Verify that we can indeed decrypt the (encrypted) key
@ -1748,7 +1748,7 @@ func fetchKMSStatus() madmin.KMS {
case subtle.ConstantTimeCompare(key[:], decryptedKey[:]) != 1:
kmsStat.Decrypt = "Decryption failed: decrypted key does not match generated key"
default:
kmsStat.Decrypt = "Ok"
kmsStat.Decrypt = "success"
}
}
return kmsStat
@ -1764,11 +1764,11 @@ func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) {
err := checkConnection(target.Endpoint(), 15*time.Second)
if err == nil {
mapLog := make(map[string]madmin.Status)
mapLog[tgt] = madmin.Status{Status: "Online"}
mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOnline)}
loggerInfo = append(loggerInfo, mapLog)
} else {
mapLog := make(map[string]madmin.Status)
mapLog[tgt] = madmin.Status{Status: "offline"}
mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOffline)}
loggerInfo = append(loggerInfo, mapLog)
}
}
@ -1780,11 +1780,11 @@ func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) {
err := checkConnection(target.Endpoint(), 15*time.Second)
if err == nil {
mapAudit := make(map[string]madmin.Status)
mapAudit[tgt] = madmin.Status{Status: "Online"}
mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOnline)}
auditloggerInfo = append(auditloggerInfo, mapAudit)
} else {
mapAudit := make(map[string]madmin.Status)
mapAudit[tgt] = madmin.Status{Status: "Offline"}
mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOffline)}
auditloggerInfo = append(auditloggerInfo, mapAudit)
}
}

View File

@ -42,16 +42,16 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req
}
if endpoint.IsLocal {
// Only proceed for local endpoints
network[nodeName] = "online"
network[nodeName] = string(madmin.ItemOnline)
localEndpoints = append(localEndpoints, endpoint)
continue
}
_, present := network[nodeName]
if !present {
if err := isServerResolvable(endpoint, 2*time.Second); err == nil {
network[nodeName] = "online"
network[nodeName] = string(madmin.ItemOnline)
} else {
network[nodeName] = "offline"
network[nodeName] = string(madmin.ItemOffline)
// log once the error
logger.LogOnceIf(context.Background(), err, nodeName)
}
@ -59,35 +59,21 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req
}
}
localDisks, _ := initStorageDisksWithErrors(localEndpoints)
defer closeStorageDisks(localDisks)
storageInfo, _ := getStorageInfo(localDisks, localEndpoints.GetAllStrings())
return madmin.ServerProperties{
State: "ok",
props := madmin.ServerProperties{
State: string(madmin.ItemInitializing),
Endpoint: addr,
Uptime: UTCNow().Unix() - globalBootTime.Unix(),
Version: Version,
CommitID: CommitID,
Network: network,
Disks: storageInfo.Disks,
}
}
func getLocalDisks(endpointServerPools EndpointServerPools) []madmin.Disk {
var localEndpoints Endpoints
for _, ep := range endpointServerPools {
for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal {
continue
}
localEndpoints = append(localEndpoints, endpoint)
}
}
localDisks, _ := initStorageDisksWithErrors(localEndpoints)
defer closeStorageDisks(localDisks)
storageInfo, _ := getStorageInfo(localDisks, localEndpoints.GetAllStrings())
return storageInfo.Disks
objLayer := newObjectLayerFn()
if objLayer != nil {
storageInfo, _ := objLayer.LocalStorageInfo(GlobalContext)
props.State = string(madmin.ItemOnline)
props.Disks = storageInfo.Disks
}
return props
}

View File

@ -23,6 +23,16 @@ import (
"github.com/minio/minio/pkg/sync/errgroup"
)
func (er erasureObjects) getLocalDisks() (localDisks []StorageAPI) {
disks := er.getDisks()
for _, disk := range disks {
if disk != nil && disk.IsLocal() {
localDisks = append(localDisks, disk)
}
}
return localDisks
}
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
// Based on the random shuffling return back randomized disks.

View File

@ -331,6 +331,35 @@ func (z *erasureServerPools) BackendInfo() (b BackendInfo) {
return
}
func (z *erasureServerPools) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
var storageInfo StorageInfo
storageInfos := make([]StorageInfo, len(z.serverPools))
storageInfosErrs := make([][]error, len(z.serverPools))
g := errgroup.WithNErrs(len(z.serverPools))
for index := range z.serverPools {
index := index
g.Go(func() error {
storageInfos[index], storageInfosErrs[index] = z.serverPools[index].LocalStorageInfo(ctx)
return nil
}, index)
}
// Wait for the go routines.
g.Wait()
storageInfo.Backend = z.BackendInfo()
for _, lstorageInfo := range storageInfos {
storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...)
}
var errs []error
for i := range z.serverPools {
errs = append(errs, storageInfosErrs[i]...)
}
return storageInfo, errs
}
func (z *erasureServerPools) StorageInfo(ctx context.Context) (StorageInfo, []error) {
var storageInfo StorageInfo

View File

@ -601,6 +601,37 @@ func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) {
return storageInfo, errs
}
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *erasureSets) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
var storageInfo StorageInfo
storageInfos := make([]StorageInfo, len(s.sets))
storageInfoErrs := make([][]error, len(s.sets))
g := errgroup.WithNErrs(len(s.sets))
for index := range s.sets {
index := index
g.Go(func() error {
storageInfos[index], storageInfoErrs[index] = s.sets[index].LocalStorageInfo(ctx)
return nil
}, index)
}
// Wait for the go routines.
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...)
}
var errs []error
for i := range s.sets {
errs = append(errs, storageInfoErrs[i]...)
}
return storageInfo, errs
}
// Shutdown shutsdown all erasure coded sets in parallel
// returns error upon first error.
func (s *erasureSets) Shutdown(ctx context.Context) error {

View File

@ -229,6 +229,18 @@ func (er erasureObjects) StorageInfo(ctx context.Context) (StorageInfo, []error)
return getStorageInfo(disks, endpoints)
}
// LocalStorageInfo - returns underlying local storage statistics.
func (er erasureObjects) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
disks := er.getLocalDisks()
endpoints := make([]string, len(disks))
for i, disk := range disks {
if disk != nil {
endpoints[i] = disk.String()
}
}
return getStorageInfo(disks, endpoints)
}
func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) {
var wg sync.WaitGroup
disks := er.getDisks()

View File

@ -206,6 +206,11 @@ func (fs *FSObjects) BackendInfo() BackendInfo {
return BackendInfo{Type: BackendFS}
}
// LocalStorageInfo - returns underlying storage statistics.
func (fs *FSObjects) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
return fs.StorageInfo(ctx)
}
// StorageInfo - returns underlying storage statistics.
func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) {
atomic.AddInt64(&fs.activeIOCount, 1)

View File

@ -39,6 +39,13 @@ func (a GatewayUnsupported) BackendInfo() BackendInfo {
return BackendInfo{Type: BackendGateway}
}
// LocalStorageInfo returns the local disks information, mainly used
// in prometheus - for gateway this just a no-op
func (a GatewayUnsupported) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
logger.CriticalIf(ctx, errors.New("not implemented"))
return StorageInfo{}, nil
}
// NSScanner - scanner is not implemented for gateway
func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
logger.CriticalIf(ctx, errors.New("not implemented"))

View File

@ -232,6 +232,10 @@ func (n *hdfsObjects) Shutdown(ctx context.Context) error {
return n.clnt.Close()
}
func (n *hdfsObjects) LocalStorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) {
return n.StorageInfo(ctx)
}
func (n *hdfsObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) {
fsInfo, err := n.clnt.StatFs()
if err != nil {

View File

@ -1023,7 +1023,7 @@ func getBucketUsageMetrics() MetricsGroup {
initialize: func(ctx context.Context, metrics *MetricsGroup) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil || globalIsGateway {
if objLayer == nil {
return
}
@ -1031,7 +1031,7 @@ func getBucketUsageMetrics() MetricsGroup {
return
}
dataUsageInfo, err := loadDataUsageFromBackend(GlobalContext, objLayer)
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer)
if err != nil {
return
}
@ -1040,8 +1040,8 @@ func getBucketUsageMetrics() MetricsGroup {
if dataUsageInfo.LastUpdate.IsZero() {
return
}
for bucket, usage := range dataUsageInfo.BucketsUsage {
for bucket, usage := range dataUsageInfo.BucketsUsage {
metrics.Metrics = append(metrics.Metrics, Metric{
Description: getBucketUsageTotalBytesMD(),
Value: float64(usage.Size),
@ -1092,9 +1092,18 @@ func getLocalStorageMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
disks := getLocalDisks(globalEndpoints)
for _, disk := range disks {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
if globalIsGateway {
return
}
storageInfo, _ := objLayer.LocalStorageInfo(ctx)
for _, disk := range storageInfo.Disks {
metrics.Metrics = append(metrics.Metrics, Metric{
Description: getNodeDiskUsedBytesMD(),
Value: float64(disk.UsedSpace),
@ -1120,15 +1129,18 @@ func getClusterStorageMetrics() MetricsGroup {
return MetricsGroup{
Metrics: []Metric{},
initialize: func(ctx context.Context, metrics *MetricsGroup) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
if globalIsGateway {
return
}
// Fetch disk space info, ignore errors
storageInfo, _ := objLayer.StorageInfo(GlobalContext)
storageInfo, _ := objLayer.StorageInfo(ctx)
onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks)
totalDisks := offlineDisks.Merge(onlineDisks)
@ -1142,15 +1154,14 @@ func getClusterStorageMetrics() MetricsGroup {
Value: float64(GetTotalCapacityFree(storageInfo.Disks)),
})
s, _ := objLayer.StorageInfo(GlobalContext)
metrics.Metrics = append(metrics.Metrics, Metric{
Description: getClusterCapacityUsageBytesMD(),
Value: GetTotalUsableCapacity(storageInfo.Disks, s),
Value: GetTotalUsableCapacity(storageInfo.Disks, storageInfo),
})
metrics.Metrics = append(metrics.Metrics, Metric{
Description: getClusterCapacityUsageFreeBytesMD(),
Value: GetTotalUsableCapacityFree(storageInfo.Disks, s),
Value: GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo),
})
metrics.Metrics = append(metrics.Metrics, Metric{

View File

@ -521,6 +521,10 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
return
}
if globalIsGateway {
return
}
server := getLocalServerProperty(globalEndpoints, &http.Request{
Host: GetLocalPeer(globalEndpoints),
})

View File

@ -1231,9 +1231,9 @@ func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties {
info, err := client.ServerInfo()
if err != nil {
info.Endpoint = client.host.String()
info.State = "offline"
info.State = string(madmin.ItemOffline)
} else {
info.State = "ok"
info.State = string(madmin.ItemOnline)
}
reply[idx] = info
}(client, i)
@ -1306,7 +1306,7 @@ func GetPeerOnlineCount() (nodesOnline, nodesOffline int) {
nodesOffline = 0
servers := globalNotificationSys.ServerInfo()
for _, s := range servers {
if s.State == "ok" {
if s.State == string(madmin.ItemOnline) {
nodesOnline++
continue
}

View File

@ -90,7 +90,8 @@ type ObjectLayer interface {
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error
BackendInfo() BackendInfo
StorageInfo(ctx context.Context) (StorageInfo, []error) // local queries only local disks
StorageInfo(ctx context.Context) (StorageInfo, []error)
LocalStorageInfo(ctx context.Context) (StorageInfo, []error)
// Bucket operations.
MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error

View File

@ -295,7 +295,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
return p, err
}
// error is unsupported disk, turn-off directIO for reads
logger.Info(fmt.Sprintf("Drive %s does not support O_DIRECT for reads, proceeding to use the drive without O_DIRECT", ep))
logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive %s does not support O_DIRECT for reads, proceeding to use the drive without O_DIRECT", ep), ep.String())
p.readODirectSupported = false
}

View File

@ -39,14 +39,17 @@ const (
// Add your own backend.
)
// ObjectLayerState - represents the status of the object layer
type ObjectLayerState string
// ItemState - represents the status of any item in offline,init,online state
type ItemState string
const (
// ObjectLayerInitializing indicates that the object layer is still in initialization phase
ObjectLayerInitializing = ObjectLayerState("initializing")
// ObjectLayerOnline indicates that the object layer is ready
ObjectLayerOnline = ObjectLayerState("online")
// ItemOffline indicates that the item is offline
ItemOffline = ItemState("offline")
// ItemInitializing indicates that the item is still in initialization phase
ItemInitializing = ItemState("initializing")
// ItemOnline indicates that the item is online
ItemOnline = ItemState("online")
)
// StorageInfo - represents total capacity of underlying storage.
@ -171,7 +174,7 @@ func (adm *AdminClient) DataUsageInfo(ctx context.Context) (DataUsageInfo, error
// InfoMessage container to hold server admin related information.
type InfoMessage struct {
Mode ObjectLayerState `json:"mode,omitempty"`
Mode string `json:"mode,omitempty"`
Domain []string `json:"domain,omitempty"`
Region string `json:"region,omitempty"`
SQSARN []string `json:"sqsARN,omitempty"`