completely remove drive caching layer from gateway days (#18217)

This has already been deprecated for close to a year now.
This commit is contained in:
Harshavardhana 2023-10-11 21:18:17 -07:00 committed by GitHub
parent f09756443d
commit 6829ae5b13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 3 additions and 5945 deletions

View file

@ -20,7 +20,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.21.1
go-version: 1.21.3
check-latest: true
- name: Get official govulncheck
run: go install golang.org/x/vuln/cmd/govulncheck@latest

View file

@ -28,7 +28,6 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/etcd"
xldap "github.com/minio/minio/internal/config/identity/ldap"
"github.com/minio/minio/internal/config/identity/openid"
@ -500,8 +499,6 @@ func (a adminAPIHandlers) GetConfigHandler(w http.ResponseWriter, r *http.Reques
switch hkv.Key {
case config.EtcdSubSys:
off = !etcd.Enabled(item.Config)
case config.CacheSubSys:
off = !cache.Enabled(item.Config)
case config.StorageClassSubSys:
off = !storageclass.Enabled(item.Config)
case config.PolicyPluginSubSys:

View file

@ -133,11 +133,6 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
w.Header().Set(xhttp.Expires, objInfo.Expires.UTC().Format(http.TimeFormat))
}
if globalCacheConfig.Enabled {
w.Header().Set(xhttp.XCache, objInfo.CacheStatus.String())
w.Header().Set(xhttp.XCacheLookup, objInfo.CacheLookupStatus.String())
}
// Set tag count if object has tags
if len(objInfo.UserTags) > 0 {
tags, _ := tags.ParseObjectTags(objInfo.UserTags)

View file

@ -61,18 +61,6 @@ func newObjectLayerFn() ObjectLayer {
return globalObjectAPI
}
func newCachedObjectLayerFn() CacheObjectLayer {
globalObjLayerMutex.RLock()
defer globalObjLayerMutex.RUnlock()
return globalCacheObjectAPI
}
func setCacheObjectLayer(c CacheObjectLayer) {
globalObjLayerMutex.Lock()
globalCacheObjectAPI = c
globalObjLayerMutex.Unlock()
}
func setObjectLayer(o ObjectLayer) {
globalObjLayerMutex.Lock()
globalObjectAPI = o
@ -82,7 +70,6 @@ func setObjectLayer(o ObjectLayer) {
// objectAPIHandler implements and provides http handlers for S3 API.
type objectAPIHandlers struct {
ObjectAPI func() ObjectLayer
CacheAPI func() CacheObjectLayer
}
// getHost tries its best to return the request host.
@ -189,7 +176,6 @@ func registerAPIRouter(router *mux.Router) {
// Initialize API.
api := objectAPIHandlers{
ObjectAPI: newObjectLayerFn,
CacheAPI: newCachedObjectLayerFn,
}
// API Router

View file

@ -474,9 +474,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
deleteObjectsFn := objectAPI.DeleteObjects
if api.CacheAPI() != nil {
deleteObjectsFn = api.CacheAPI().DeleteObjects
}
// Return Malformed XML as S3 spec if the number of objects is empty
if len(deleteObjectsReq.Objects) == 0 || len(deleteObjectsReq.Objects) > maxDeleteList {
@ -486,9 +483,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
objectsToDelete := map[ObjectToDelete]int{}
getObjectInfoFn := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfoFn = api.CacheAPI().GetObjectInfo
}
var (
hasLockEnabled bool

View file

@ -27,7 +27,6 @@ import (
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/api"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns"
@ -46,7 +45,6 @@ import (
"github.com/minio/minio/internal/config/subnet"
"github.com/minio/minio/internal/crypto"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
)
@ -54,7 +52,6 @@ import (
func initHelp() {
kvs := map[string]config.KVS{
config.EtcdSubSys: etcd.DefaultKVS,
config.CacheSubSys: cache.DefaultKVS,
config.CompressionSubSys: compress.DefaultKVS,
config.IdentityLDAPSubSys: xldap.DefaultKVS,
config.IdentityOpenIDSubSys: openid.DefaultKVS,
@ -209,10 +206,6 @@ func initHelp() {
Key: config.EtcdSubSys,
Description: "persist IAM assets externally to etcd",
},
config.HelpKV{
Key: config.CacheSubSys,
Description: "[DEPRECATED] add caching storage tier",
},
}
if globalIsErasure {
@ -232,7 +225,6 @@ func initHelp() {
config.APISubSys: api.Help,
config.StorageClassSubSys: storageclass.Help,
config.EtcdSubSys: etcd.Help,
config.CacheSubSys: cache.Help,
config.CompressionSubSys: compress.Help,
config.HealSubSys: heal.Help,
config.ScannerSubSys: scanner.Help,
@ -302,10 +294,6 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o
return err
}
}
case config.CacheSubSys:
if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default]); err != nil {
return err
}
case config.CompressionSubSys:
if _, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]); err != nil {
return err
@ -493,20 +481,6 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
logger.LogIf(ctx, fmt.Errorf("Invalid site configuration: %w", err))
}
globalCacheConfig, err = cache.LookupConfig(s[config.CacheSubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to setup cache: %w", err))
}
if globalCacheConfig.Enabled {
if cacheEncKey := env.Get(cache.EnvCacheEncryptionKey, ""); cacheEncKey != "" {
globalCacheKMS, err = kms.Parse(cacheEncKey)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to setup encryption cache: %w", err))
}
}
}
globalAutoEncryption = crypto.LookupAutoEncryption() // Enable auto-encryption if enabled
if globalAutoEncryption && GlobalKMS == nil {
logger.Fatal(errors.New("no KMS configured"), "MINIO_KMS_AUTO_ENCRYPTION requires a valid KMS configuration")

View file

@ -29,7 +29,6 @@ import (
"github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/compress"
xldap "github.com/minio/minio/internal/config/identity/ldap"
"github.com/minio/minio/internal/config/identity/openid"
@ -1997,11 +1996,6 @@ func migrateV22ToV23() error {
srvConfig.StorageClass.RRS = cv22.StorageClass.RRS
srvConfig.StorageClass.Standard = cv22.StorageClass.Standard
// Init cache config.For future migration, Cache config needs to be copied over from previous version.
srvConfig.Cache.Drives = []string{}
srvConfig.Cache.Exclude = []string{}
srvConfig.Cache.Expiry = 90
if err = Save(configFile, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %w", cv22.Version, srvConfig.Version, err)
}
@ -2110,11 +2104,6 @@ func migrateV23ToV24() error {
srvConfig.StorageClass.RRS = cv23.StorageClass.RRS
srvConfig.StorageClass.Standard = cv23.StorageClass.Standard
// Load cache config from existing cache config in the file.
srvConfig.Cache.Drives = cv23.Cache.Drives
srvConfig.Cache.Exclude = cv23.Cache.Exclude
srvConfig.Cache.Expiry = cv23.Cache.Expiry
if err = quick.SaveConfig(srvConfig, configFile, globalEtcdClient); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %w", cv23.Version, srvConfig.Version, err)
}
@ -2228,11 +2217,6 @@ func migrateV24ToV25() error {
srvConfig.StorageClass.RRS = cv24.StorageClass.RRS
srvConfig.StorageClass.Standard = cv24.StorageClass.Standard
// Load cache config from existing cache config in the file.
srvConfig.Cache.Drives = cv24.Cache.Drives
srvConfig.Cache.Exclude = cv24.Cache.Exclude
srvConfig.Cache.Expiry = cv24.Cache.Expiry
if err = quick.SaveConfig(srvConfig, configFile, globalEtcdClient); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %w", cv24.Version, srvConfig.Version, err)
}
@ -2344,14 +2328,6 @@ func migrateV25ToV26() error {
srvConfig.StorageClass.RRS = cv25.StorageClass.RRS
srvConfig.StorageClass.Standard = cv25.StorageClass.Standard
// Load cache config from existing cache config in the file.
srvConfig.Cache.Drives = cv25.Cache.Drives
srvConfig.Cache.Exclude = cv25.Cache.Exclude
srvConfig.Cache.Expiry = cv25.Cache.Expiry
// Add predefined value to new server config.
srvConfig.Cache.MaxUse = 80
if err = quick.SaveConfig(srvConfig, configFile, globalEtcdClient); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %w", cv25.Version, srvConfig.Version, err)
}
@ -2574,7 +2550,6 @@ func readConfigWithoutMigrate(ctx context.Context, objAPI ObjectLayer) (config.C
xldap.SetIdentityLDAP(newCfg, cfg.LDAPServerConfig)
opa.SetPolicyOPAConfig(newCfg, cfg.Policy.OPA)
cache.SetCacheConfig(newCfg, cfg.Cache)
compress.SetCompressionConfig(newCfg, cfg.Compression)
for k, args := range cfg.Notify.AMQP {

View file

@ -22,7 +22,6 @@ import (
"github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/compress"
xldap "github.com/minio/minio/internal/config/identity/ldap"
"github.com/minio/minio/internal/config/identity/openid"
@ -589,9 +588,6 @@ type serverConfigV23 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
}
@ -610,9 +606,6 @@ type serverConfigV24 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
}
@ -634,9 +627,6 @@ type serverConfigV25 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
}
@ -658,9 +648,6 @@ type serverConfigV26 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
}
@ -682,9 +669,6 @@ type serverConfigV27 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
@ -707,9 +691,6 @@ type serverConfigV28 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notifierV3 `json:"notify"`
@ -731,9 +712,6 @@ type serverConfigV33 struct {
// Storage class configuration
StorageClass storageclass.Config `json:"storageclass"`
// Cache configuration
Cache cache.Config `json:"cache"`
// Notification queue configuration.
Notify notify.Config `json:"notify"`

File diff suppressed because it is too large Load diff

View file

@ -1,60 +0,0 @@
//go:build windows
// +build windows
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"errors"
"os"
"github.com/djherbis/atime"
"golang.org/x/sys/windows/registry"
)
// Return error if Atime is disabled on the O/S
func checkAtimeSupport(dir string) (err error) {
file, err := os.CreateTemp(dir, "prefix")
if err != nil {
return
}
defer os.Remove(file.Name())
defer file.Close()
finfo1, err := os.Stat(file.Name())
if err != nil {
return
}
atime.Get(finfo1)
k, err := registry.OpenKey(registry.LOCAL_MACHINE, `SYSTEM\CurrentControlSet\Control\FileSystem`, registry.QUERY_VALUE)
if err != nil {
return
}
defer k.Close()
setting, _, err := k.GetIntegerValue("NtfsDisableLastAccessUpdate")
if err != nil {
return
}
lowSetting := setting & 0xFFFF
if lowSetting != uint64(0x0000) && lowSetting != uint64(0x0002) {
return errors.New("Atime not supported")
}
return
}

View file

@ -1,57 +0,0 @@
//go:build !windows
// +build !windows
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"errors"
"io"
"os"
"time"
"github.com/djherbis/atime"
)
// Return error if Atime is disabled on the O/S
func checkAtimeSupport(dir string) (err error) {
file, err := os.CreateTemp(dir, "prefix")
if err != nil {
return
}
defer os.Remove(file.Name())
defer file.Close()
finfo1, err := os.Stat(file.Name())
if err != nil {
return
}
// add a sleep to ensure atime change is detected
time.Sleep(10 * time.Millisecond)
if _, err = io.Copy(io.Discard, file); err != nil {
return
}
finfo2, err := os.Stat(file.Name())
if atime.Get(finfo2).Equal(atime.Get(finfo1)) {
return errors.New("Atime not supported")
}
return
}

View file

@ -1,88 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"sync/atomic"
)
// CacheDiskStats represents cache disk statistics
// such as current disk usage and available.
type CacheDiskStats struct {
// used cache size
UsageSize uint64
// total cache disk capacity
TotalCapacity uint64
// indicates if usage is high or low, if high value is '1', if low its '0'
UsageState int32
// indicates the current usage percentage of this cache disk
UsagePercent uint64
Dir string
}
// GetUsageLevelString gets the string representation for the usage level.
func (c *CacheDiskStats) GetUsageLevelString() (u string) {
if atomic.LoadInt32(&c.UsageState) == 0 {
return "low"
}
return "high"
}
// CacheStats - represents bytes served from cache,
// cache hits and cache misses.
type CacheStats struct {
BytesServed uint64
Hits uint64
Misses uint64
GetDiskStats func() []CacheDiskStats
}
// Increase total bytes served from cache
func (s *CacheStats) incBytesServed(n int64) {
atomic.AddUint64(&s.BytesServed, uint64(n))
}
// Increase cache hit by 1
func (s *CacheStats) incHit() {
atomic.AddUint64(&s.Hits, 1)
}
// Increase cache miss by 1
func (s *CacheStats) incMiss() {
atomic.AddUint64(&s.Misses, 1)
}
// Get total bytes served
func (s *CacheStats) getBytesServed() uint64 {
return atomic.LoadUint64(&s.BytesServed)
}
// Get total cache hits
func (s *CacheStats) getHits() uint64 {
return atomic.LoadUint64(&s.Hits)
}
// Get total cache misses
func (s *CacheStats) getMisses() uint64 {
return atomic.LoadUint64(&s.Misses)
}
// Prepare new CacheStats structure
func newCacheStats() *CacheStats {
return &CacheStats{}
}

View file

@ -1,587 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"container/list"
"errors"
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
"time"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/etag"
)
// CacheStatusType - whether the request was served from cache.
type CacheStatusType string
const (
// CacheHit - whether object was served from cache.
CacheHit CacheStatusType = "HIT"
// CacheMiss - object served from backend.
CacheMiss CacheStatusType = "MISS"
)
func (c CacheStatusType) String() string {
if c != "" {
return string(c)
}
return string(CacheMiss)
}
type cacheControl struct {
expiry time.Time
maxAge int
sMaxAge int
minFresh int
maxStale int
noStore bool
onlyIfCached bool
noCache bool
}
func (c *cacheControl) isStale(modTime time.Time) bool {
if c == nil {
return false
}
// response will never be stale if only-if-cached is set
if c.onlyIfCached {
return false
}
// Cache-Control value no-store indicates never cache
if c.noStore {
return true
}
// Cache-Control value no-cache indicates cache entry needs to be revalidated before
// serving from cache
if c.noCache {
return true
}
now := time.Now()
if c.sMaxAge > 0 && c.sMaxAge < int(now.Sub(modTime).Seconds()) {
return true
}
if c.maxAge > 0 && c.maxAge < int(now.Sub(modTime).Seconds()) {
return true
}
if !c.expiry.Equal(time.Time{}) && c.expiry.Before(time.Now().Add(time.Duration(c.maxStale))) {
return true
}
if c.minFresh > 0 && c.minFresh <= int(now.Sub(modTime).Seconds()) {
return true
}
return false
}
// returns struct with cache-control settings from user metadata.
func cacheControlOpts(o ObjectInfo) *cacheControl {
c := cacheControl{}
m := o.UserDefined
if !o.Expires.Equal(timeSentinel) {
c.expiry = o.Expires
}
var headerVal string
for k, v := range m {
if strings.EqualFold(k, "cache-control") {
headerVal = v
}
}
if headerVal == "" {
return nil
}
headerVal = strings.ToLower(headerVal)
headerVal = strings.TrimSpace(headerVal)
vals := strings.Split(headerVal, ",")
for _, val := range vals {
val = strings.TrimSpace(val)
if val == "no-store" {
c.noStore = true
continue
}
if val == "only-if-cached" {
c.onlyIfCached = true
continue
}
if val == "no-cache" {
c.noCache = true
continue
}
p := strings.Split(val, "=")
if len(p) != 2 {
continue
}
if p[0] == "max-age" ||
p[0] == "s-maxage" ||
p[0] == "min-fresh" ||
p[0] == "max-stale" {
i, err := strconv.Atoi(p[1])
if err != nil {
return nil
}
if p[0] == "max-age" {
c.maxAge = i
}
if p[0] == "s-maxage" {
c.sMaxAge = i
}
if p[0] == "min-fresh" {
c.minFresh = i
}
if p[0] == "max-stale" {
c.maxStale = i
}
}
}
return &c
}
// backendDownError returns true if err is due to backend failure or faulty disk if in server mode
func backendDownError(err error) bool {
_, backendDown := err.(BackendDown)
return backendDown || IsErr(err, baseErrs...)
}
// IsCacheable returns if the object should be saved in the cache.
func (o ObjectInfo) IsCacheable() bool {
if globalCacheKMS != nil {
return true
}
_, ok := crypto.IsEncrypted(o.UserDefined)
return !ok
}
// reads file cached on disk from offset upto length
func readCacheFileStream(filePath string, offset, length int64) (io.ReadCloser, error) {
if filePath == "" || offset < 0 {
return nil, errInvalidArgument
}
if err := checkPathLength(filePath); err != nil {
return nil, err
}
fr, err := os.Open(filePath)
if err != nil {
return nil, osErrToFileErr(err)
}
// Stat to get the size of the file at path.
st, err := fr.Stat()
if err != nil {
err = osErrToFileErr(err)
return nil, err
}
if err = os.Chtimes(filePath, time.Now(), st.ModTime()); err != nil {
return nil, err
}
// Verify if its not a regular file, since subsequent Seek is undefined.
if !st.Mode().IsRegular() {
return nil, errIsNotRegular
}
if err = os.Chtimes(filePath, time.Now(), st.ModTime()); err != nil {
return nil, err
}
// Seek to the requested offset.
if offset > 0 {
_, err = fr.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
}
return struct {
io.Reader
io.Closer
}{Reader: io.LimitReader(fr, length), Closer: fr}, nil
}
func isCacheEncrypted(meta map[string]string) bool {
_, ok := meta[SSECacheEncrypted]
return ok
}
// decryptCacheObjectETag tries to decrypt the ETag saved in encrypted format using the cache KMS
func decryptCacheObjectETag(info *ObjectInfo) error {
if info.IsDir {
return nil // Directories are never encrypted.
}
// Depending on the SSE type we handle ETags slightly
// differently. ETags encrypted with SSE-S3 must be
// decrypted first, since the client expects that
// a single-part SSE-S3 ETag is equal to the content MD5.
//
// For all other SSE types, the ETag is not the content MD5.
// Therefore, we don't decrypt but only format it.
switch kind, ok := crypto.IsEncrypted(info.UserDefined); {
case ok && kind == crypto.S3 && isCacheEncrypted(info.UserDefined):
ETag, err := etag.Parse(info.ETag)
if err != nil {
return err
}
if !ETag.IsEncrypted() {
info.ETag = ETag.Format().String()
return nil
}
key, err := crypto.S3.UnsealObjectKey(globalCacheKMS, info.UserDefined, info.Bucket, info.Name)
if err != nil {
return err
}
ETag, err = etag.Decrypt(key[:], ETag)
if err != nil {
return err
}
info.ETag = ETag.Format().String()
case ok && (kind == crypto.S3KMS || kind == crypto.SSEC) && isCacheEncrypted(info.UserDefined):
ETag, err := etag.Parse(info.ETag)
if err != nil {
return err
}
info.ETag = ETag.Format().String()
}
return nil
}
// decryptCacheObjectETag tries to decrypt the ETag saved in encrypted format using the cache KMS
func decryptCachePartETags(c *cacheMeta) ([]string, error) {
// Depending on the SSE type we handle ETags slightly
// differently. ETags encrypted with SSE-S3 must be
// decrypted first, since the client expects that
// a single-part SSE-S3 ETag is equal to the content MD5.
//
// For all other SSE types, the ETag is not the content MD5.
// Therefore, we don't decrypt but only format it.
switch kind, ok := crypto.IsEncrypted(c.Meta); {
case ok && kind == crypto.S3 && isCacheEncrypted(c.Meta):
key, err := crypto.S3.UnsealObjectKey(globalCacheKMS, c.Meta, c.Bucket, c.Object)
if err != nil {
return nil, err
}
etags := make([]string, 0, len(c.PartETags))
for i := range c.PartETags {
ETag, err := etag.Parse(c.PartETags[i])
if err != nil {
return nil, err
}
ETag, err = etag.Decrypt(key[:], ETag)
if err != nil {
return nil, err
}
etags = append(etags, ETag.Format().String())
}
return etags, nil
case ok && (kind == crypto.S3KMS || kind == crypto.SSEC) && isCacheEncrypted(c.Meta):
etags := make([]string, 0, len(c.PartETags))
for i := range c.PartETags {
ETag, err := etag.Parse(c.PartETags[i])
if err != nil {
return nil, err
}
etags = append(etags, ETag.Format().String())
}
return etags, nil
default:
return c.PartETags, nil
}
}
func isMetadataSame(m1, m2 map[string]string) bool {
if m1 == nil && m2 == nil {
return true
}
if (m1 == nil && m2 != nil) || (m2 == nil && m1 != nil) {
return false
}
if len(m1) != len(m2) {
return false
}
for k1, v1 := range m1 {
if v2, ok := m2[k1]; !ok || (v1 != v2) {
return false
}
}
return true
}
type fileScorer struct {
saveBytes uint64
now int64
maxHits int
// 1/size for consistent score.
sizeMult float64
// queue is a linked list of files we want to delete.
// The list is kept sorted according to score, highest at top, lowest at bottom.
queue list.List
queuedBytes uint64
seenBytes uint64
}
type queuedFile struct {
name string
versionID string
size uint64
score float64
}
// newFileScorer allows to collect files to save a specific number of bytes.
// Each file is assigned a score based on its age, size and number of hits.
// A list of files is maintained
func newFileScorer(saveBytes uint64, now int64, maxHits int) (*fileScorer, error) {
if saveBytes == 0 {
return nil, errors.New("newFileScorer: saveBytes = 0")
}
if now < 0 {
return nil, errors.New("newFileScorer: now < 0")
}
if maxHits <= 0 {
return nil, errors.New("newFileScorer: maxHits <= 0")
}
f := fileScorer{saveBytes: saveBytes, maxHits: maxHits, now: now, sizeMult: 1 / float64(saveBytes)}
f.queue.Init()
return &f, nil
}
func (f *fileScorer) addFile(name string, accTime time.Time, size int64, hits int) {
f.addFileWithObjInfo(ObjectInfo{
Name: name,
AccTime: accTime,
Size: size,
}, hits)
}
func (f *fileScorer) addFileWithObjInfo(objInfo ObjectInfo, hits int) {
// Calculate how much we want to delete this object.
file := queuedFile{
name: objInfo.Name,
versionID: objInfo.VersionID,
size: uint64(objInfo.Size),
}
f.seenBytes += uint64(objInfo.Size)
var score float64
if objInfo.ModTime.IsZero() {
// Mod time is not available with disk cache use atime.
score = float64(f.now - objInfo.AccTime.Unix())
} else {
// if not used mod time when mod time is available.
score = float64(f.now - objInfo.ModTime.Unix())
}
// Size as fraction of how much we want to save, 0->1.
szWeight := math.Max(0, (math.Min(1, float64(file.size)*f.sizeMult)))
// 0 at f.maxHits, 1 at 0.
hitsWeight := (1.0 - math.Max(0, math.Min(1.0, float64(hits)/float64(f.maxHits))))
file.score = score * (1 + 0.25*szWeight + 0.25*hitsWeight)
// If we still haven't saved enough, just add the file
if f.queuedBytes < f.saveBytes {
f.insertFile(file)
f.trimQueue()
return
}
// If we score less than the worst, don't insert.
worstE := f.queue.Back()
if worstE != nil && file.score < worstE.Value.(queuedFile).score {
return
}
f.insertFile(file)
f.trimQueue()
}
// adjustSaveBytes allows to adjust the number of bytes to save.
// This can be used to adjust the count on the fly.
// Returns true if there still is a need to delete files (n+saveBytes >0),
// false if no more bytes needs to be saved.
func (f *fileScorer) adjustSaveBytes(n int64) bool {
if f == nil {
return false
}
if int64(f.saveBytes)+n <= 0 {
f.saveBytes = 0
f.trimQueue()
return false
}
if n < 0 {
f.saveBytes -= ^uint64(n - 1)
} else {
f.saveBytes += uint64(n)
}
if f.saveBytes == 0 {
f.queue.Init()
f.saveBytes = 0
return false
}
if n < 0 {
f.trimQueue()
}
return true
}
// insertFile will insert a file into the list, sorted by its score.
func (f *fileScorer) insertFile(file queuedFile) {
e := f.queue.Front()
for e != nil {
v := e.Value.(queuedFile)
if v.score < file.score {
break
}
e = e.Next()
}
f.queuedBytes += file.size
// We reached the end.
if e == nil {
f.queue.PushBack(file)
return
}
f.queue.InsertBefore(file, e)
}
// trimQueue will trim the back of queue and still keep below wantSave.
func (f *fileScorer) trimQueue() {
for {
e := f.queue.Back()
if e == nil {
return
}
v := e.Value.(queuedFile)
if f.queuedBytes-v.size < f.saveBytes {
return
}
f.queue.Remove(e)
f.queuedBytes -= v.size
}
}
func (f *fileScorer) purgeFunc(p func(qfile queuedFile)) {
e := f.queue.Front()
for e != nil {
p(e.Value.(queuedFile))
e = e.Next()
}
}
// fileNames returns all queued file names.
func (f *fileScorer) fileNames() []string {
res := make([]string, 0, f.queue.Len())
e := f.queue.Front()
for e != nil {
res = append(res, e.Value.(queuedFile).name)
e = e.Next()
}
return res
}
func (f *fileScorer) reset() {
f.queue.Init()
f.queuedBytes = 0
}
func (f *fileScorer) queueString() string {
var res strings.Builder
e := f.queue.Front()
i := 0
for e != nil {
v := e.Value.(queuedFile)
if i > 0 {
res.WriteByte('\n')
}
res.WriteString(fmt.Sprintf("%03d: %s (score: %.3f, bytes: %d)", i, v.name, v.score, v.size))
i++
e = e.Next()
}
return res.String()
}
// bytesToClear() returns the number of bytes to clear to reach low watermark
// w.r.t quota given disk total and free space, quota in % allocated to cache
// and low watermark % w.r.t allowed quota.
// If the high watermark hasn't been reached 0 will be returned.
func bytesToClear(total, free int64, quotaPct, lowWatermark, highWatermark uint64) uint64 {
used := total - free
quotaAllowed := total * (int64)(quotaPct) / 100
highWMUsage := total * (int64)(highWatermark*quotaPct) / (100 * 100)
if used < highWMUsage {
return 0
}
// Return bytes needed to reach low watermark.
lowWMUsage := total * (int64)(lowWatermark*quotaPct) / (100 * 100)
return (uint64)(math.Min(float64(quotaAllowed), math.Max(0.0, float64(used-lowWMUsage))))
}
type multiWriter struct {
backendWriter io.Writer
cacheWriter *io.PipeWriter
pipeClosed bool
}
// multiWriter writes to backend and cache - if cache write
// fails close the pipe, but continue writing to the backend
func (t *multiWriter) Write(p []byte) (n int, err error) {
n, err = t.backendWriter.Write(p)
if err == nil && n != len(p) {
err = io.ErrShortWrite
return
}
if err != nil {
if !t.pipeClosed {
t.cacheWriter.CloseWithError(err)
}
return
}
// ignore errors writing to cache
if !t.pipeClosed {
_, cerr := t.cacheWriter.Write(p)
if cerr != nil {
t.pipeClosed = true
t.cacheWriter.CloseWithError(cerr)
}
}
return len(p), nil
}
func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer {
return &multiWriter{backendWriter: w1, cacheWriter: w2}
}
// writebackInProgress returns true if writeback commit is not complete
func writebackInProgress(m map[string]string) bool {
if v, ok := m[writeBackStatusHeader]; ok {
switch cacheCommitStatus(v) {
case CommitPending, CommitFailed:
return true
}
}
return false
}

View file

@ -1,177 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"net/http"
"reflect"
"testing"
"time"
)
func TestGetCacheControlOpts(t *testing.T) {
expiry, _ := time.Parse(http.TimeFormat, "Wed, 21 Oct 2015 07:28:00 GMT")
testCases := []struct {
cacheControlHeaderVal string
expiryHeaderVal time.Time
expectedCacheControl *cacheControl
expectedErr bool
}{
{"", timeSentinel, nil, false},
{"max-age=2592000, public", timeSentinel, &cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false},
{"max-age=2592000, no-store", timeSentinel, &cacheControl{maxAge: 2592000, sMaxAge: 0, noStore: true, minFresh: 0, expiry: time.Time{}}, false},
{"must-revalidate, max-age=600", timeSentinel, &cacheControl{maxAge: 600, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false},
{"s-maxAge=2500, max-age=600", timeSentinel, &cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}}, false},
{"s-maxAge=2500, max-age=600", expiry, &cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Date(2015, time.October, 21, 0o7, 28, 0o0, 0o0, time.UTC)}, false},
{"s-maxAge=2500, max-age=600s", timeSentinel, &cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}}, true},
}
for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
m := make(map[string]string)
m["cache-control"] = testCase.cacheControlHeaderVal
if !testCase.expiryHeaderVal.Equal(timeSentinel) {
m["expires"] = testCase.expiryHeaderVal.String()
}
c := cacheControlOpts(ObjectInfo{UserDefined: m, Expires: testCase.expiryHeaderVal})
if testCase.expectedErr && (c != nil) {
t.Errorf("expected err, got <nil>")
}
if !testCase.expectedErr && !reflect.DeepEqual(c, testCase.expectedCacheControl) {
t.Errorf("expected %v, got %v", testCase.expectedCacheControl, c)
}
})
}
}
func TestIsMetadataSame(t *testing.T) {
testCases := []struct {
m1 map[string]string
m2 map[string]string
expected bool
}{
{nil, nil, true},
{nil, map[string]string{}, false},
{map[string]string{"k": "v"}, map[string]string{"k": "v"}, true},
{map[string]string{"k": "v"}, map[string]string{"a": "b"}, false},
{map[string]string{"k1": "v1", "k2": "v2"}, map[string]string{"k1": "v1", "k2": "v1"}, false},
{map[string]string{"k1": "v1", "k2": "v2"}, map[string]string{"k1": "v1", "k2": "v2"}, true},
{map[string]string{"K1": "v1", "k2": "v2"}, map[string]string{"k1": "v1", "k2": "v2"}, false},
{map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"}, map[string]string{"k1": "v1", "k2": "v2"}, false},
}
for i, testCase := range testCases {
actual := isMetadataSame(testCase.m1, testCase.m2)
if testCase.expected != actual {
t.Errorf("test %d expected %v, got %v", i, testCase.expected, actual)
}
}
}
func TestNewFileScorer(t *testing.T) {
fs, err := newFileScorer(1000, time.Now().Unix(), 10)
if err != nil {
t.Fatal(err)
}
if len(fs.fileNames()) != 0 {
t.Fatal("non zero files??")
}
now := time.Now()
fs.addFile("recent", now.Add(-time.Minute), 1000, 10)
fs.addFile("older", now.Add(-time.Hour), 1000, 10)
if !reflect.DeepEqual(fs.fileNames(), []string{"older"}) {
t.Fatal("unexpected file list", fs.queueString())
}
fs.reset()
fs.addFile("bigger", now.Add(-time.Minute), 2000, 10)
fs.addFile("recent", now.Add(-time.Minute), 1000, 10)
if !reflect.DeepEqual(fs.fileNames(), []string{"bigger"}) {
t.Fatal("unexpected file list", fs.queueString())
}
fs.reset()
fs.addFile("less", now.Add(-time.Minute), 1000, 5)
fs.addFile("recent", now.Add(-time.Minute), 1000, 10)
if !reflect.DeepEqual(fs.fileNames(), []string{"less"}) {
t.Fatal("unexpected file list", fs.queueString())
}
fs.reset()
fs.addFile("small", now.Add(-time.Minute), 200, 10)
fs.addFile("medium", now.Add(-time.Minute), 300, 10)
if !reflect.DeepEqual(fs.fileNames(), []string{"medium", "small"}) {
t.Fatal("unexpected file list", fs.queueString())
}
fs.addFile("large", now.Add(-time.Minute), 700, 10)
fs.addFile("xsmol", now.Add(-time.Minute), 7, 10)
if !reflect.DeepEqual(fs.fileNames(), []string{"large", "medium"}) {
t.Fatal("unexpected file list", fs.queueString())
}
fs.reset()
fs.addFile("less", now.Add(-time.Minute), 500, 5)
fs.addFile("recent", now.Add(-time.Minute), 500, 10)
if !fs.adjustSaveBytes(-500) {
t.Fatal("we should still need more bytes, got false")
}
// We should only need 500 bytes now.
if !reflect.DeepEqual(fs.fileNames(), []string{"less"}) {
t.Fatal("unexpected file list", fs.queueString())
}
if fs.adjustSaveBytes(-500) {
t.Fatal("we shouldn't need any more bytes, got true")
}
fs, err = newFileScorer(1000, time.Now().Unix(), 10)
if err != nil {
t.Fatal(err)
}
fs.addFile("bigger", now.Add(-time.Minute), 50, 10)
// sorting should be consistent after adjusting savebytes.
fs.adjustSaveBytes(-800)
fs.addFile("smaller", now.Add(-time.Minute), 40, 10)
if !reflect.DeepEqual(fs.fileNames(), []string{"bigger", "smaller"}) {
t.Fatal("unexpected file list", fs.queueString())
}
}
func TestBytesToClear(t *testing.T) {
testCases := []struct {
total int64
free int64
quotaPct uint64
watermarkLow uint64
watermarkHigh uint64
expected uint64
}{
{total: 1000, free: 800, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 0},
{total: 1000, free: 200, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 400},
{total: 1000, free: 400, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 240},
{total: 1000, free: 600, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 40},
{total: 1000, free: 600, quotaPct: 40, watermarkLow: 70, watermarkHigh: 70, expected: 120},
{total: 1000, free: 1000, quotaPct: 90, watermarkLow: 70, watermarkHigh: 70, expected: 0},
// High not yet reached..
{total: 1000, free: 250, quotaPct: 100, watermarkLow: 50, watermarkHigh: 90, expected: 0},
{total: 1000, free: 250, quotaPct: 100, watermarkLow: 50, watermarkHigh: 90, expected: 0},
}
for i, tc := range testCases {
toClear := bytesToClear(tc.total, tc.free, tc.quotaPct, tc.watermarkLow, tc.watermarkHigh)
if tc.expected != toClear {
t.Errorf("test %d expected %v, got %v", i, tc.expected, toClear)
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,74 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"testing"
"time"
)
// Tests ToObjectInfo function.
func TestCacheMetadataObjInfo(t *testing.T) {
m := cacheMeta{Meta: nil}
objInfo := m.ToObjectInfo()
if objInfo.Size != 0 {
t.Fatal("Unexpected object info value for Size", objInfo.Size)
}
if !objInfo.ModTime.Equal(time.Time{}) {
t.Fatal("Unexpected object info value for ModTime ", objInfo.ModTime)
}
if objInfo.IsDir {
t.Fatal("Unexpected object info value for IsDir", objInfo.IsDir)
}
if !objInfo.Expires.IsZero() {
t.Fatal("Unexpected object info value for Expires ", objInfo.Expires)
}
}
// test wildcard patterns for excluding entries from cache
func TestCacheExclusion(t *testing.T) {
cobjects := &cacheObjects{
cache: nil,
}
testCases := []struct {
bucketName string
objectName string
excludePattern string
expectedResult bool
}{
{"testbucket", "testobjectmatch", "testbucket/testobj*", true},
{"testbucket", "testobjectnomatch", "testbucet/testobject*", false},
{"testbucket", "testobject/pref1/obj1", "*/*", true},
{"testbucket", "testobject/pref1/obj1", "*/pref1/*", true},
{"testbucket", "testobject/pref1/obj1", "testobject/*", false},
{"photos", "image1.jpg", "*.jpg", true},
{"photos", "europe/paris/seine.jpg", "seine.jpg", false},
{"photos", "europe/paris/seine.jpg", "*/seine.jpg", true},
{"phil", "z/likes/coffee", "*/likes/*", true},
{"failbucket", "no/slash/prefixes", "/failbucket/no/", false},
{"failbucket", "no/slash/prefixes", "/failbucket/no/*", false},
}
for i, testCase := range testCases {
cobjects.exclude = []string{testCase.excludePattern}
if cobjects.isCacheExclude(testCase.bucketName, testCase.objectName) != testCase.expectedResult {
t.Fatal("Cache exclusion test failed for case ", i)
}
}
}

View file

@ -478,10 +478,7 @@ func EncryptRequest(content io.Reader, r *http.Request, bucket, object string, m
func decryptObjectMeta(key []byte, bucket, object string, metadata map[string]string) ([]byte, error) {
switch kind, _ := crypto.IsEncrypted(metadata); kind {
case crypto.S3:
var KMS kms.KMS = GlobalKMS
if isCacheEncrypted(metadata) {
KMS = globalCacheKMS
}
KMS := GlobalKMS
if KMS == nil {
return nil, errKMSNotConfigured
}

View file

@ -1,498 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strings"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/logger"
"github.com/minio/sio"
)
const (
// Represents Cache format json holding details on all other cache drives in use.
formatCache = "cache"
// formatCacheV1.Cache.Version
formatCacheVersionV1 = "1"
formatCacheVersionV2 = "2"
formatMetaVersion1 = "1"
formatCacheV1DistributionAlgo = "CRCMOD"
)
// Represents the current cache structure with list of
// disks comprising the disk cache
// formatCacheV1 - structure holds format config version '1'.
type formatCacheV1 struct {
formatMetaV1
Cache struct {
Version string `json:"version"` // Version of 'cache' format.
This string `json:"this"` // This field carries assigned disk uuid.
// Disks field carries the input disk order generated the first
// time when fresh disks were supplied.
Disks []string `json:"disks"`
// Distribution algorithm represents the hashing algorithm
// to pick the right set index for an object.
DistributionAlgo string `json:"distributionAlgo"`
} `json:"cache"` // Cache field holds cache format.
}
// formatCacheV2 is same as formatCacheV1
type formatCacheV2 = formatCacheV1
// Used to detect the version of "cache" format.
type formatCacheVersionDetect struct {
Cache struct {
Version string `json:"version"`
} `json:"cache"`
}
// Return a slice of format, to be used to format uninitialized disks.
func newFormatCacheV2(drives []string) []*formatCacheV2 {
diskCount := len(drives)
disks := make([]string, diskCount)
formats := make([]*formatCacheV2, diskCount)
for i := 0; i < diskCount; i++ {
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV2
format.Cache.DistributionAlgo = formatCacheV1DistributionAlgo
format.Cache.This = mustGetUUID()
formats[i] = format
disks[i] = formats[i].Cache.This
}
for i := 0; i < diskCount; i++ {
format := formats[i]
format.Cache.Disks = disks
}
return formats
}
// Returns formatCache.Cache.Version
func formatCacheGetVersion(r io.ReadSeeker) (string, error) {
format := &formatCacheVersionDetect{}
if err := jsonLoad(r, format); err != nil {
return "", err
}
return format.Cache.Version, nil
}
// Creates a new cache format.json if unformatted.
func createFormatCache(fsFormatPath string, format *formatCacheV1) error {
// open file using READ & WRITE permission
file, err := os.OpenFile(fsFormatPath, os.O_RDWR|os.O_CREATE, 0o666)
if err != nil {
return err
}
// Close the locked file upon return.
defer file.Close()
fi, err := file.Stat()
if err != nil {
return err
}
if fi.Size() != 0 {
// format.json already got created because of another minio process's createFormatCache()
return nil
}
return jsonSave(file, format)
}
// This function creates a cache format file on disk and returns a slice
// of format cache config
func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV2, err error) {
nformats := newFormatCacheV2(drives)
for i, drive := range drives {
if err = os.MkdirAll(pathJoin(drive, minioMetaBucket), 0o777); err != nil {
logger.GetReqInfo(ctx).AppendTags("drive", drive)
logger.LogIf(ctx, err)
return nil, err
}
cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile)
// Fresh disk - create format.json for this cfs
if err = createFormatCache(cacheFormatPath, nformats[i]); err != nil {
logger.GetReqInfo(ctx).AppendTags("drive", drive)
logger.LogIf(ctx, err)
return nil, err
}
}
return nformats, nil
}
func loadFormatCache(ctx context.Context, drives []string) ([]*formatCacheV2, bool, error) {
formats := make([]*formatCacheV2, len(drives))
var formatV2 *formatCacheV2
migrating := false
for i, drive := range drives {
cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile)
f, err := os.OpenFile(cacheFormatPath, os.O_RDWR, 0o666)
if err != nil {
if osIsNotExist(err) {
continue
}
logger.LogIf(ctx, err)
return nil, migrating, err
}
defer f.Close()
format, err := formatMetaCacheV1(f)
if err != nil {
continue
}
formatV2 = format
if format.Cache.Version != formatCacheVersionV2 {
migrating = true
}
formats[i] = formatV2
}
return formats, migrating, nil
}
// unmarshalls the cache format.json into formatCacheV1
func formatMetaCacheV1(r io.ReadSeeker) (*formatCacheV1, error) {
format := &formatCacheV1{}
if err := jsonLoad(r, format); err != nil {
return nil, err
}
return format, nil
}
func checkFormatCacheValue(format *formatCacheV2, migrating bool) error {
if format.Format != formatCache {
return fmt.Errorf("Unsupported cache format [%s] found", format.Format)
}
// during migration one or more cache drive(s) formats can be out of sync
if migrating {
// Validate format version and format type.
if format.Version != formatMetaVersion1 {
return fmt.Errorf("Unsupported version of cache format [%s] found", format.Version)
}
if format.Cache.Version != formatCacheVersionV2 && format.Cache.Version != formatCacheVersionV1 {
return fmt.Errorf("Unsupported Cache backend format found [%s]", format.Cache.Version)
}
return nil
}
// Validate format version and format type.
if format.Version != formatMetaVersion1 {
return fmt.Errorf("Unsupported version of cache format [%s] found", format.Version)
}
if format.Cache.Version != formatCacheVersionV2 {
return fmt.Errorf("Unsupported Cache backend format found [%s]", format.Cache.Version)
}
return nil
}
func checkFormatCacheValues(migrating bool, formats []*formatCacheV2) (int, error) {
for i, formatCache := range formats {
if formatCache == nil {
continue
}
if err := checkFormatCacheValue(formatCache, migrating); err != nil {
return i, err
}
if len(formats) != len(formatCache.Cache.Disks) {
return i, fmt.Errorf("Expected number of cache drives %d , got %d",
len(formatCache.Cache.Disks), len(formats))
}
}
return -1, nil
}
// checkCacheDisksConsistency - checks if "This" disk uuid on each disk is consistent with all "Disks" slices
// across disks.
func checkCacheDiskConsistency(formats []*formatCacheV2) error {
disks := make([]string, len(formats))
// Collect currently available disk uuids.
for index, format := range formats {
if format == nil {
disks[index] = ""
continue
}
disks[index] = format.Cache.This
}
for i, format := range formats {
if format == nil {
continue
}
j := findCacheDiskIndex(disks[i], format.Cache.Disks)
if j == -1 {
return fmt.Errorf("UUID on positions %d:%d do not match with , expected %s", i, j, disks[i])
}
if i != j {
return fmt.Errorf("UUID on positions %d:%d do not match with , expected %s got %s", i, j, disks[i], format.Cache.Disks[j])
}
}
return nil
}
// checkCacheDisksSliceConsistency - validate cache Disks order if they are consistent.
func checkCacheDisksSliceConsistency(formats []*formatCacheV2) error {
var sentinelDisks []string
// Extract first valid Disks slice.
for _, format := range formats {
if format == nil {
continue
}
sentinelDisks = format.Cache.Disks
break
}
for _, format := range formats {
if format == nil {
continue
}
currentDisks := format.Cache.Disks
if !reflect.DeepEqual(sentinelDisks, currentDisks) {
return errors.New("inconsistent cache drives found")
}
}
return nil
}
// findCacheDiskIndex returns position of cache disk in JBOD.
func findCacheDiskIndex(disk string, disks []string) int {
for index, uuid := range disks {
if uuid == disk {
return index
}
}
return -1
}
// validate whether cache drives order has changed
func validateCacheFormats(ctx context.Context, migrating bool, formats []*formatCacheV2) error {
count := 0
for _, format := range formats {
if format == nil {
count++
}
}
if count == len(formats) {
return errors.New("Cache format files missing on all drives")
}
if _, err := checkFormatCacheValues(migrating, formats); err != nil {
logger.LogIf(ctx, err)
return err
}
if err := checkCacheDisksSliceConsistency(formats); err != nil {
logger.LogIf(ctx, err)
return err
}
err := checkCacheDiskConsistency(formats)
logger.LogIf(ctx, err)
return err
}
// return true if all of the list of cache drives are
// fresh disks
func cacheDrivesUnformatted(drives []string) bool {
count := 0
for _, drive := range drives {
cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile)
if _, err := os.Stat(cacheFormatPath); osIsNotExist(err) {
count++
}
}
return count == len(drives)
}
// create format.json for each cache drive if fresh disk or load format from disk
// Then validate the format for all drives in the cache to ensure order
// of cache drives has not changed.
func loadAndValidateCacheFormat(ctx context.Context, drives []string) (formats []*formatCacheV2, migrating bool, err error) {
if cacheDrivesUnformatted(drives) {
formats, err = initFormatCache(ctx, drives)
} else {
formats, migrating, err = loadFormatCache(ctx, drives)
}
if err != nil {
return nil, false, err
}
if err = validateCacheFormats(ctx, migrating, formats); err != nil {
return nil, false, err
}
return formats, migrating, nil
}
// reads cached object on disk and writes it back after adding bitrot
// hashsum per block as per the new disk cache format.
func migrateCacheData(ctx context.Context, c *diskCache, bucket, object, oldfile, destDir string, metadata map[string]string) error {
st, err := os.Stat(oldfile)
if err != nil {
err = osErrToFileErr(err)
return err
}
readCloser, err := readCacheFileStream(oldfile, 0, st.Size())
if err != nil {
return err
}
defer readCloser.Close()
var reader io.Reader = readCloser
actualSize := uint64(st.Size())
if globalCacheKMS != nil {
reader, err = newCacheEncryptReader(ctx, readCloser, bucket, object, metadata)
if err != nil {
return err
}
actualSize, _ = sio.EncryptedSize(uint64(st.Size()))
}
_, _, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, actualSize)
return err
}
// migrate cache contents from old cacheFS format to new backend format
// new format is flat
//
// sha(bucket,object)/ <== dir name
// - part.1 <== data
// - cache.json <== metadata
func migrateOldCache(ctx context.Context, c *diskCache) error {
oldCacheBucketsPath := path.Join(c.dir, minioMetaBucket, "buckets")
cacheFormatPath := pathJoin(c.dir, minioMetaBucket, formatConfigFile)
if _, err := os.Stat(oldCacheBucketsPath); err != nil {
// remove .minio.sys sub directories
removeAll(path.Join(c.dir, minioMetaBucket, "multipart"))
removeAll(path.Join(c.dir, minioMetaBucket, "tmp"))
removeAll(path.Join(c.dir, minioMetaBucket, "trash"))
removeAll(path.Join(c.dir, minioMetaBucket, "buckets"))
// just migrate cache format
return migrateCacheFormatJSON(cacheFormatPath)
}
buckets, err := readDir(oldCacheBucketsPath)
if err != nil {
return err
}
for _, bucket := range buckets {
bucket = strings.TrimSuffix(bucket, SlashSeparator)
var objMetaPaths []string
root := path.Join(oldCacheBucketsPath, bucket)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, cacheMetaJSONFile) {
objMetaPaths = append(objMetaPaths, path)
}
return nil
})
if err != nil {
return err
}
for _, oMeta := range objMetaPaths {
objSlice := strings.SplitN(oMeta, cacheMetaJSONFile, 2)
object := strings.TrimPrefix(objSlice[0], path.Join(oldCacheBucketsPath, bucket))
object = strings.TrimSuffix(object, "/")
destdir := getCacheSHADir(c.dir, bucket, object)
if err := os.MkdirAll(destdir, 0o777); err != nil {
return err
}
prevCachedPath := path.Join(c.dir, bucket, object)
// get old cached metadata
oldMetaPath := pathJoin(oldCacheBucketsPath, bucket, object, cacheMetaJSONFile)
metaPath := pathJoin(destdir, cacheMetaJSONFile)
metaBytes, err := os.ReadFile(oldMetaPath)
if err != nil {
return err
}
// marshal cache metadata after adding version and stat info
meta := &cacheMeta{}
json := jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.Unmarshal(metaBytes, &meta); err != nil {
return err
}
// move cached object to new cache directory path
// migrate cache data and add bit-rot protection hash sum
// at the start of each block
if err := migrateCacheData(ctx, c, bucket, object, prevCachedPath, destdir, meta.Meta); err != nil {
continue
}
stat, err := os.Stat(prevCachedPath)
if err != nil {
if err == errFileNotFound {
continue
}
logger.LogIf(ctx, err)
return err
}
// old cached file can now be removed
if err := os.Remove(prevCachedPath); err != nil {
return err
}
// move cached metadata after changing cache metadata version
meta.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
meta.Version = cacheMetaVersion
meta.Stat.Size = stat.Size()
meta.Stat.ModTime = stat.ModTime()
jsonData, err := json.Marshal(meta)
if err != nil {
return err
}
if err = os.WriteFile(metaPath, jsonData, 0o644); err != nil {
return err
}
}
// delete old bucket from cache, now that all contents are cleared
removeAll(path.Join(c.dir, bucket))
}
// remove .minio.sys sub directories
removeAll(path.Join(c.dir, minioMetaBucket, "multipart"))
removeAll(path.Join(c.dir, minioMetaBucket, "tmp"))
removeAll(path.Join(c.dir, minioMetaBucket, "trash"))
removeAll(path.Join(c.dir, minioMetaBucket, "buckets"))
return migrateCacheFormatJSON(cacheFormatPath)
}
func migrateCacheFormatJSON(cacheFormatPath string) error {
// now migrate format.json
f, err := os.OpenFile(cacheFormatPath, os.O_RDWR, 0o666)
if err != nil {
return err
}
defer f.Close()
formatV1 := formatCacheV1{}
if err := jsonLoad(f, &formatV1); err != nil {
return err
}
formatV2 := &formatCacheV2{}
formatV2.formatMetaV1 = formatV1.formatMetaV1
formatV2.Version = formatMetaVersion1
formatV2.Cache = formatV1.Cache
formatV2.Cache.Version = formatCacheVersionV2
return jsonSave(f, formatV2)
}

View file

@ -1,321 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"os"
"testing"
)
// TestDiskCacheFormat - tests initFormatCache, formatMetaGetFormatBackendCache, formatCacheGetVersion.
func TestDiskCacheFormat(t *testing.T) {
ctx := context.Background()
fsDirs, err := getRandomDisks(1)
if err != nil {
t.Fatal(err)
}
_, err = initFormatCache(ctx, fsDirs)
if err != nil {
t.Fatal(err)
}
// Do the basic sanity checks to check if initFormatCache() did its job.
cacheFormatPath := pathJoin(fsDirs[0], minioMetaBucket, formatConfigFile)
f, err := os.OpenFile(cacheFormatPath, os.O_RDWR|os.O_SYNC, 0)
if err != nil {
t.Fatal(err)
}
defer f.Close()
version, err := formatCacheGetVersion(f)
if err != nil {
t.Fatal(err)
}
if version != formatCacheVersionV2 {
t.Fatalf(`expected: %s, got: %s`, formatCacheVersionV2, version)
}
// Corrupt the format.json file and test the functions.
// formatMetaGetFormatBackendFS, formatFSGetVersion, initFormatFS should return errors.
if err = f.Truncate(0); err != nil {
t.Fatal(err)
}
if _, err = f.WriteString("b"); err != nil {
t.Fatal(err)
}
if _, _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil {
t.Fatal("expected to fail")
}
// With unknown formatMetaV1.Version formatMetaGetFormatCache, initFormatCache should return error.
if err = f.Truncate(0); err != nil {
t.Fatal(err)
}
// Here we set formatMetaV1.Version to "2"
if _, err = f.WriteString(`{"version":"2","format":"cache","cache":{"version":"1"}}`); err != nil {
t.Fatal(err)
}
if _, _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil {
t.Fatal("expected to fail")
}
}
// generates a valid format.json for Cache backend.
func genFormatCacheValid() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
return formatConfigs
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidVersion() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
// Corrupt version numbers.
formatConfigs[0].Version = "2"
formatConfigs[3].Version = "-1"
return formatConfigs
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidFormat() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
// Corrupt format.
formatConfigs[0].Format = "cach"
formatConfigs[3].Format = "cach"
return formatConfigs
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidCacheVersion() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV1
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
// Corrupt version numbers.
formatConfigs[0].Cache.Version = "10"
formatConfigs[3].Cache.Version = "-1"
return formatConfigs
}
// generates a invalid format.json version for Cache backend.
func genFormatCacheInvalidDisksCount() []*formatCacheV2 {
disks := make([]string, 7)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV2{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
return formatConfigs
}
// generates a invalid format.json Disks for Cache backend.
func genFormatCacheInvalidDisks() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
for index := range disks {
disks[index] = mustGetUUID()
}
// Corrupt Disks entries on disk 6 and disk 8.
formatConfigs[5].Cache.Disks = disks
formatConfigs[7].Cache.Disks = disks
return formatConfigs
}
// generates a invalid format.json This disk UUID for Cache backend.
func genFormatCacheInvalidThis() []*formatCacheV1 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV1, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
// Make disk 5 and disk 8 have inconsistent disk uuid's.
formatConfigs[4].Cache.This = mustGetUUID()
formatConfigs[7].Cache.This = mustGetUUID()
return formatConfigs
}
// generates a invalid format.json Disk UUID in wrong order for Cache backend.
func genFormatCacheInvalidDisksOrder() []*formatCacheV2 {
disks := make([]string, 8)
formatConfigs := make([]*formatCacheV2, 8)
for index := range disks {
disks[index] = mustGetUUID()
}
for index := range disks {
format := &formatCacheV1{}
format.Version = formatMetaVersion1
format.Format = formatCache
format.Cache.Version = formatCacheVersionV2
format.Cache.This = disks[index]
format.Cache.Disks = disks
formatConfigs[index] = format
}
// Re order disks for failure case.
disks1 := make([]string, 8)
copy(disks1, disks)
disks1[1], disks1[2] = disks[2], disks[1]
formatConfigs[2].Cache.Disks = disks1
return formatConfigs
}
// Wrapper for calling FormatCache tests - validates
// - valid format
// - unrecognized version number
// - unrecognized format tag
// - unrecognized cache version
// - wrong number of Disks entries
// - invalid This uuid
// - invalid Disks order
func TestFormatCache(t *testing.T) {
formatInputCases := [][]*formatCacheV1{
genFormatCacheValid(),
genFormatCacheInvalidVersion(),
genFormatCacheInvalidFormat(),
genFormatCacheInvalidCacheVersion(),
genFormatCacheInvalidDisksCount(),
genFormatCacheInvalidDisks(),
genFormatCacheInvalidThis(),
genFormatCacheInvalidDisksOrder(),
}
testCases := []struct {
formatConfigs []*formatCacheV1
shouldPass bool
}{
{
formatConfigs: formatInputCases[0],
shouldPass: true,
},
{
formatConfigs: formatInputCases[1],
shouldPass: false,
},
{
formatConfigs: formatInputCases[2],
shouldPass: false,
},
{
formatConfigs: formatInputCases[3],
shouldPass: false,
},
{
formatConfigs: formatInputCases[4],
shouldPass: false,
},
{
formatConfigs: formatInputCases[5],
shouldPass: false,
},
{
formatConfigs: formatInputCases[6],
shouldPass: false,
},
{
formatConfigs: formatInputCases[7],
shouldPass: false,
},
}
for i, testCase := range testCases {
err := validateCacheFormats(context.Background(), false, testCase.formatConfigs)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass but failed with %s", i+1, err)
}
if err == nil && !testCase.shouldPass {
t.Errorf("Test %d: Expected to fail but passed instead", i+1)
}
}
}

View file

@ -38,7 +38,6 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns"
@ -273,12 +272,6 @@ var (
globalBucketQuotaSys *BucketQuotaSys
globalBucketVersioningSys *BucketVersioningSys
// Disk cache drives
globalCacheConfig cache.Config
// Initialized KMS configuration for disk cache
globalCacheKMS kms.KMS
// Allocated etcd endpoint for config and bucket DNS.
globalEtcdClient *etcd.Client

View file

@ -59,7 +59,6 @@ func init() {
}
peerMetricsGroups = []*MetricsGroup{
getCacheMetrics(),
getGoMetrics(),
getHTTPMetrics(false),
getNotificationMetrics(),
@ -85,7 +84,6 @@ func init() {
nodeGroups := []*MetricsGroup{
getNodeHealthMetrics(),
getCacheMetrics(),
getHTTPMetrics(false),
getNetworkMetrics(),
getMinioVersionMetrics(),
@ -238,8 +236,6 @@ const (
latencyMicroSec MetricName = "latency_us"
latencyNanoSec MetricName = "latency_ns"
usagePercent MetricName = "update_percent"
commitInfo MetricName = "commit_info"
usageInfo MetricName = "usage_info"
versionInfo MetricName = "version_info"
@ -1230,76 +1226,6 @@ func getS3RejectedInvalidRequestsTotalMD() MetricDescription {
}
}
func getCacheHitsTotalMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: cacheSubsystem,
Name: hitsTotal,
Help: "Total number of drive cache hits",
Type: counterMetric,
}
}
func getCacheHitsMissedTotalMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: cacheSubsystem,
Name: missedTotal,
Help: "Total number of drive cache misses",
Type: counterMetric,
}
}
func getCacheUsagePercentMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: minioNamespace,
Name: usagePercent,
Help: "Total percentage cache usage",
Type: gaugeMetric,
}
}
func getCacheUsageInfoMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: cacheSubsystem,
Name: usageInfo,
Help: "Total percentage cache usage, value of 1 indicates high and 0 low, label level is set as well",
Type: gaugeMetric,
}
}
func getCacheUsedBytesMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: cacheSubsystem,
Name: usedBytes,
Help: "Current cache usage in bytes",
Type: gaugeMetric,
}
}
func getCacheTotalBytesMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: cacheSubsystem,
Name: totalBytes,
Help: "Total size of cache drive in bytes",
Type: gaugeMetric,
}
}
func getCacheSentBytesMD() MetricDescription {
return MetricDescription{
Namespace: minioNamespace,
Subsystem: cacheSubsystem,
Name: sentBytes,
Help: "Total number of bytes served from cache",
Type: counterMetric,
}
}
func getHealObjectsTotalMD() MetricDescription {
return MetricDescription{
Namespace: healMetricNamespace,
@ -2454,56 +2380,6 @@ func getObjectsScanned(seq *healSequence) (m []Metric) {
return
}
func getCacheMetrics() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
cacheObjLayer := newCachedObjectLayerFn()
// Service not initialized yet
if cacheObjLayer == nil {
return
}
metrics = make([]Metric, 0, 20)
metrics = append(metrics, Metric{
Description: getCacheHitsTotalMD(),
Value: float64(cacheObjLayer.CacheStats().getHits()),
})
metrics = append(metrics, Metric{
Description: getCacheHitsMissedTotalMD(),
Value: float64(cacheObjLayer.CacheStats().getMisses()),
})
metrics = append(metrics, Metric{
Description: getCacheSentBytesMD(),
Value: float64(cacheObjLayer.CacheStats().getBytesServed()),
})
for _, cdStats := range cacheObjLayer.CacheStats().GetDiskStats() {
metrics = append(metrics, Metric{
Description: getCacheUsagePercentMD(),
Value: float64(cdStats.UsagePercent),
VariableLabels: map[string]string{"drive": cdStats.Dir},
})
metrics = append(metrics, Metric{
Description: getCacheUsageInfoMD(),
Value: float64(cdStats.UsageState),
VariableLabels: map[string]string{"drive": cdStats.Dir, "level": cdStats.GetUsageLevelString()},
})
metrics = append(metrics, Metric{
Description: getCacheUsedBytesMD(),
Value: float64(cdStats.UsageSize),
VariableLabels: map[string]string{"drive": cdStats.Dir},
})
metrics = append(metrics, Metric{
Description: getCacheTotalBytesMD(),
Value: float64(cdStats.TotalCapacity),
VariableLabels: map[string]string{"drive": cdStats.Dir},
})
}
return
})
return mg
}
func getDistLockMetrics() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Second,

View file

@ -108,7 +108,6 @@ func (c *minioCollector) Collect(ch chan<- prometheus.Metric) {
bucketUsageMetricsPrometheus(ch)
networkMetricsPrometheus(ch)
httpMetricsPrometheus(ch)
cacheMetricsPrometheus(ch)
healingMetricsPrometheus(ch)
}
@ -188,82 +187,6 @@ func healingMetricsPrometheus(ch chan<- prometheus.Metric) {
}
}
// collects cache metrics for MinIO server in Prometheus specific format
// and sends to given channel
func cacheMetricsPrometheus(ch chan<- prometheus.Metric) {
cacheObjLayer := newCachedObjectLayerFn()
// Service not initialized yet
if cacheObjLayer == nil {
return
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(cacheNamespace, "hits", "total"),
"Total number of drive cache hits in current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(cacheObjLayer.CacheStats().getHits()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(cacheNamespace, "misses", "total"),
"Total number of drive cache misses in current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(cacheObjLayer.CacheStats().getMisses()),
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(cacheNamespace, "data", "served"),
"Total number of bytes served from cache of current MinIO instance",
nil, nil),
prometheus.CounterValue,
float64(cacheObjLayer.CacheStats().getBytesServed()),
)
for _, cdStats := range cacheObjLayer.CacheStats().GetDiskStats() {
// Cache disk usage percentage
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(cacheNamespace, "usage", "percent"),
"Total percentage cache usage",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(cdStats.UsagePercent),
cdStats.Dir,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(cacheNamespace, "usage", "high"),
"Indicates cache usage is high or low, relative to current cache 'quota' settings",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(cdStats.UsageState),
cdStats.Dir,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "usage", "size"),
"Indicates current cache usage in bytes",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(cdStats.UsageSize),
cdStats.Dir,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("cache", "total", "size"),
"Indicates total size of cache drive",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(cdStats.TotalCapacity),
cdStats.Dir,
)
}
}
// collects http metrics for MinIO server in Prometheus specific format
// and sends to given channel
func httpMetricsPrometheus(ch chan<- prometheus.Metric) {

View file

@ -52,9 +52,6 @@ var globalObjLayerMutex sync.RWMutex
// Global object layer, only accessed by globalObjectAPI.
var globalObjectAPI ObjectLayer
// Global cacheObjects, only accessed by newCacheObjectsFn().
var globalCacheObjectAPI CacheObjectLayer
type storageOpts struct {
cleanUp bool
healthCheck bool

View file

@ -149,11 +149,6 @@ type ObjectInfo struct {
// Date and time at which the object is no longer able to be cached
Expires time.Time
// CacheStatus sets status of whether this is a cache hit/miss
CacheStatus CacheStatusType
// CacheLookupStatus sets whether a cacheable response is present in the cache
CacheLookupStatus CacheStatusType
// Specify object storage class
StorageClass string
@ -245,8 +240,6 @@ func (o *ObjectInfo) Clone() (cinfo ObjectInfo) {
ContentType: o.ContentType,
ContentEncoding: o.ContentEncoding,
Expires: o.Expires,
CacheStatus: o.CacheStatus,
CacheLookupStatus: o.CacheLookupStatus,
StorageClass: o.StorageClass,
ReplicationStatus: o.ReplicationStatus,
UserTags: o.UserTags,

View file

@ -131,9 +131,6 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
// Check for auth type to return S3 compatible error.
// type to return the correct error (NoSuchKey vs AccessDenied)
@ -192,9 +189,6 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
defer lock.RUnlock(lkctx)
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
gopts := opts
gopts.NoLock = true // We already have a lock, we can live with it.
@ -349,9 +343,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
IsOwner: false,
}) {
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
_, err = getObjectInfo(ctx, bucket, object, opts)
if toAPIError(ctx, err).Code == "NoSuchKey" {
@ -364,9 +355,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
}
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
// Get request range.
var rs *HTTPRangeSpec
@ -609,9 +597,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
@ -643,9 +628,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
IsOwner: false,
}) {
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
_, err = getObjectInfo(ctx, bucket, object, opts)
if toAPIError(ctx, err).Code == "NoSuchKey" {
@ -1082,9 +1064,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
checkCopyPrecondFn := func(o ObjectInfo) bool {
if _, err := DecryptObjectInfo(&o, r); err != nil {
@ -1367,9 +1346,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), dstBucket, dstObject, r, policy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), dstBucket, dstObject, r, policy.PutObjectLegalHoldAction)
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
// apply default bucket configuration/governance headers for dest side.
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, dstBucket, dstObject, getObjectInfo, retPerms, holdPerms)
@ -1512,9 +1488,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
copyObjectFn := objectAPI.CopyObject
if api.CacheAPI() != nil {
copyObjectFn = api.CacheAPI().CopyObject
}
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
@ -1800,17 +1773,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
if api.CacheAPI() != nil {
putObject = api.CacheAPI().PutObject
}
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode.Valid() {
@ -2104,14 +2070,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)
if api.CacheAPI() != nil {
putObject = api.CacheAPI().PutObject
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
// These are static for all objects extracted.
reqParams := extractReqParams(r)
@ -2401,9 +2360,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
})
deleteObject := objectAPI.DeleteObject
if api.CacheAPI() != nil {
deleteObject = api.CacheAPI().DeleteObject
}
// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
objInfo, err := deleteObject(ctx, bucket, object, opts)
@ -2594,9 +2550,6 @@ func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
@ -2762,9 +2715,6 @@ func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
@ -3062,9 +3012,6 @@ func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r *
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
// Check for auth type to return S3 compatible error.
if s3Error := checkRequestAuthType(ctx, r, policy.RestoreObjectAction, bucket, object); s3Error != ErrNone {

View file

@ -148,9 +148,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode.Valid() {
@ -210,9 +207,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
}
newMultipartUpload := objectAPI.NewMultipartUpload
if api.CacheAPI() != nil {
newMultipartUpload = api.CacheAPI().NewMultipartUpload
}
res, err := newMultipartUpload(ctx, bucket, object, opts)
if err != nil {
@ -329,9 +323,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
}
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
// Get request range.
var rs *HTTPRangeSpec
@ -542,9 +533,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
srcInfo.PutObjReader = pReader
copyObjectPart := objectAPI.CopyObjectPart
if api.CacheAPI() != nil {
copyObjectPart = api.CacheAPI().CopyObjectPart
}
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
partInfo, err := copyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID,
@ -821,9 +810,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
opts.IndexCB = idxCb
putObjectPart := objectAPI.PutObjectPart
if api.CacheAPI() != nil {
putObjectPart = api.CacheAPI().PutObjectPart
}
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
if err != nil {
@ -934,9 +920,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
completeMultiPartUpload := objectAPI.CompleteMultipartUpload
if api.CacheAPI() != nil {
completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload
}
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
@ -1058,9 +1041,6 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
return
}
abortMultipartUpload := objectAPI.AbortMultipartUpload
if api.CacheAPI() != nil {
abortMultipartUpload = api.CacheAPI().AbortMultipartUpload
}
if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)

View file

@ -79,9 +79,6 @@ func (api objectAPIHandlers) getObjectInArchiveFileHandler(ctx context.Context,
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
// Check for auth type to return S3 compatible error.
// type to return the correct error (NoSuchKey vs AccessDenied)
@ -375,9 +372,6 @@ func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context,
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, zipPath)
if err != nil {

View file

@ -853,16 +853,6 @@ func serverMain(ctx *cli.Context) {
})
}()
// initialize the new disk cache objects.
if globalCacheConfig.Enabled {
logger.Info(color.Yellow("WARNING: Drive caching is deprecated for single/multi drive MinIO setups."))
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize drive caching")
setCacheObjectLayer(cacheAPI)
}
// Initialize bucket notification system.
bootstrapTrace("initBucketTargets", func() {
logger.LogIf(GlobalContext, globalEventNotifier.InitBucketTargets(GlobalContext, newObject))

View file

@ -23,7 +23,6 @@ import (
"net/url"
"strings"
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/logger"
@ -52,11 +51,6 @@ func printStartupMessage(apiEndpoints []string, err error) {
}
strippedAPIEndpoints := stripStandardPorts(apiEndpoints, globalMinioHost)
// If cache layer is enabled, print cache capacity.
cachedObjAPI := newCachedObjectLayerFn()
if cachedObjAPI != nil {
printCacheStorageInfo(cachedObjAPI.StorageInfo(GlobalContext))
}
// Object layer is initialized then print StorageInfo.
objAPI := newObjectLayerFn()
@ -226,10 +220,3 @@ func printStorageInfo(storageInfo StorageInfo) {
logger.Info(msg)
}
}
func printCacheStorageInfo(storageInfo CacheStorageInfo) {
msg := fmt.Sprintf("%s %s Free, %s Total", color.Blue("Cache Capacity:"),
humanize.IBytes(storageInfo.Free),
humanize.IBytes(storageInfo.Total))
logger.Info(msg)
}

View file

@ -2049,9 +2049,6 @@ func registerAPIFunctions(muxRouter *mux.Router, objLayer ObjectLayer, apiFuncti
ObjectAPI: func() ObjectLayer {
return globalObjectAPI
},
CacheAPI: func() CacheObjectLayer {
return globalCacheObjectAPI
},
}
// Register ListBuckets handler.

View file

@ -702,37 +702,6 @@ func NewRemoteTargetHTTPTransport(insecure bool) func() *http.Transport {
}.NewRemoteTargetHTTPTransport(insecure)
}
// Load the json (typically from disk file).
func jsonLoad(r io.ReadSeeker, data interface{}) error {
if _, err := r.Seek(0, io.SeekStart); err != nil {
return err
}
return json.NewDecoder(r).Decode(data)
}
// Save to disk file in json format.
func jsonSave(f interface {
io.WriteSeeker
Truncate(int64) error
}, data interface{},
) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
if err = f.Truncate(0); err != nil {
return err
}
if _, err = f.Seek(0, io.SeekStart); err != nil {
return err
}
_, err = f.Write(b)
if err != nil {
return err
}
return nil
}
// ceilFrac takes a numerator and denominator representing a fraction
// and returns its ceiling. If denominator is 0, it returns 0 instead
// of crashing.

View file

@ -1,171 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"encoding/json"
"errors"
"path/filepath"
"strings"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/v2/ellipses"
)
const (
// WriteBack allows staging and write back of cached content for single object uploads
WriteBack = "writeback"
// WriteThrough allows caching multipart uploads to disk synchronously
WriteThrough = "writethrough"
)
// Config represents cache config settings
type Config struct {
Enabled bool `json:"-"`
Drives []string `json:"drives"`
Expiry int `json:"expiry"`
MaxUse int `json:"maxuse"`
Quota int `json:"quota"`
Exclude []string `json:"exclude"`
After int `json:"after"`
WatermarkLow int `json:"watermark_low"`
WatermarkHigh int `json:"watermark_high"`
Range bool `json:"range"`
CacheCommitMode string `json:"commit"`
}
// UnmarshalJSON - implements JSON unmarshal interface for unmarshalling
// json entries for CacheConfig.
func (cfg *Config) UnmarshalJSON(data []byte) (err error) {
type Alias Config
_cfg := &struct {
*Alias
}{
Alias: (*Alias)(cfg),
}
if err = json.Unmarshal(data, _cfg); err != nil {
return err
}
if _cfg.Expiry < 0 {
return errors.New("config expiry value should not be negative")
}
if _cfg.MaxUse < 0 {
return errors.New("config max use value should not be null or negative")
}
if _cfg.Quota < 0 {
return errors.New("config quota value should not be null or negative")
}
if _cfg.After < 0 {
return errors.New("cache after value should not be less than 0")
}
if _cfg.WatermarkLow < 0 || _cfg.WatermarkLow > 100 {
return errors.New("config low watermark value should be between 0 and 100")
}
if _cfg.WatermarkHigh < 0 || _cfg.WatermarkHigh > 100 {
return errors.New("config high watermark value should be between 0 and 100")
}
if _cfg.WatermarkLow > 0 && (_cfg.WatermarkLow >= _cfg.WatermarkHigh) {
return errors.New("config low watermark value should be less than high watermark")
}
return nil
}
// Parses given cacheDrivesEnv and returns a list of cache drives.
func parseCacheDrives(drives string) ([]string, error) {
var drivesSlice []string
if len(drives) == 0 {
return drivesSlice, nil
}
drivesSlice = strings.Split(drives, cacheDelimiterLegacy)
if len(drivesSlice) == 1 && drivesSlice[0] == drives {
drivesSlice = strings.Split(drives, cacheDelimiter)
}
var endpoints []string
for _, d := range drivesSlice {
if len(d) == 0 {
return nil, config.ErrInvalidCacheDrivesValue(nil).Msg("cache dir cannot be an empty path")
}
if ellipses.HasEllipses(d) {
s, err := parseCacheDrivePaths(d)
if err != nil {
return nil, err
}
endpoints = append(endpoints, s...)
} else {
endpoints = append(endpoints, d)
}
}
for _, d := range endpoints {
if !filepath.IsAbs(d) {
return nil, config.ErrInvalidCacheDrivesValue(nil).Msg("cache dir should be absolute path: %s", d)
}
}
return endpoints, nil
}
// Parses all arguments and returns a slice of drive paths following the ellipses pattern.
func parseCacheDrivePaths(arg string) (ep []string, err error) {
patterns, perr := ellipses.FindEllipsesPatterns(arg)
if perr != nil {
return []string{}, config.ErrInvalidCacheDrivesValue(nil).Msg(perr.Error())
}
for _, lbls := range patterns.Expand() {
ep = append(ep, strings.Join(lbls, ""))
}
return ep, nil
}
// Parses given cacheExcludesEnv and returns a list of cache exclude patterns.
func parseCacheExcludes(excludes string) ([]string, error) {
var excludesSlice []string
if len(excludes) == 0 {
return excludesSlice, nil
}
excludesSlice = strings.Split(excludes, cacheDelimiterLegacy)
if len(excludesSlice) == 1 && excludesSlice[0] == excludes {
excludesSlice = strings.Split(excludes, cacheDelimiter)
}
for _, e := range excludesSlice {
if len(e) == 0 {
return nil, config.ErrInvalidCacheExcludesValue(nil).Msg("cache exclude path (%s) cannot be empty", e)
}
if strings.HasPrefix(e, "/") {
return nil, config.ErrInvalidCacheExcludesValue(nil).Msg("cache exclude pattern (%s) cannot start with / as prefix", e)
}
}
return excludesSlice, nil
}
func parseCacheCommitMode(commitStr string) (string, error) {
switch strings.ToLower(commitStr) {
case WriteBack, WriteThrough:
return strings.ToLower(commitStr), nil
default:
return "", config.ErrInvalidCacheCommitValue(nil).Msg("cache commit value must be `writeback` or `writethrough`")
}
}

View file

@ -1,128 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"reflect"
"runtime"
"testing"
)
// Tests cache drive parsing.
func TestParseCacheDrives(t *testing.T) {
testCases := []struct {
driveStr string
expectedPatterns []string
success bool
}{
// Invalid input
{"bucket1/*;*.png;images/trip/barcelona/*", []string{}, false},
{"bucket1", []string{}, false},
{";;;", []string{}, false},
{",;,;,;", []string{}, false},
}
// Valid inputs
if runtime.GOOS == "windows" {
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"C:/home/drive1;C:/home/drive2;C:/home/drive3", []string{"C:/home/drive1", "C:/home/drive2", "C:/home/drive3"}, true})
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"C:/home/drive{1...3}", []string{"C:/home/drive1", "C:/home/drive2", "C:/home/drive3"}, true})
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"C:/home/drive{1..3}", []string{}, false})
} else {
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"/home/drive1;/home/drive2;/home/drive3", []string{"/home/drive1", "/home/drive2", "/home/drive3"}, true})
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"/home/drive1,/home/drive2,/home/drive3", []string{"/home/drive1", "/home/drive2", "/home/drive3"}, true})
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"/home/drive{1...3}", []string{"/home/drive1", "/home/drive2", "/home/drive3"}, true})
testCases = append(testCases, struct {
driveStr string
expectedPatterns []string
success bool
}{"/home/drive{1..3}", []string{}, false})
}
for i, testCase := range testCases {
drives, err := parseCacheDrives(testCase.driveStr)
if err != nil && testCase.success {
t.Errorf("Test %d: Expected success but failed instead %s", i+1, err)
}
if err == nil && !testCase.success {
t.Errorf("Test %d: Expected failure but passed instead", i+1)
}
if err == nil {
if !reflect.DeepEqual(drives, testCase.expectedPatterns) {
t.Errorf("Test %d: Expected %v, got %v", i+1, testCase.expectedPatterns, drives)
}
}
}
}
// Tests cache exclude parsing.
func TestParseCacheExclude(t *testing.T) {
testCases := []struct {
excludeStr string
expectedPatterns []string
success bool
}{
// Invalid input
{"/home/drive1;/home/drive2;/home/drive3", []string{}, false},
{"/", []string{}, false},
{";;;", []string{}, false},
// valid input
{"bucket1/*;*.png;images/trip/barcelona/*", []string{"bucket1/*", "*.png", "images/trip/barcelona/*"}, true},
{"bucket1/*,*.png,images/trip/barcelona/*", []string{"bucket1/*", "*.png", "images/trip/barcelona/*"}, true},
{"bucket1", []string{"bucket1"}, true},
}
for i, testCase := range testCases {
excludes, err := parseCacheExcludes(testCase.excludeStr)
if err != nil && testCase.success {
t.Errorf("Test %d: Expected success but failed instead %s", i+1, err)
}
if err == nil && !testCase.success {
t.Errorf("Test %d: Expected failure but passed instead", i+1)
}
if err == nil {
if !reflect.DeepEqual(excludes, testCase.expectedPatterns) {
t.Errorf("Test %d: Expected %v, got %v", i+1, testCase.expectedPatterns, excludes)
}
}
}
}

View file

@ -1,89 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import "github.com/minio/minio/internal/config"
// Help template for caching feature.
var (
defaultHelpPostfix = func(key string) string {
return config.DefaultHelpPostfix(DefaultKVS, key)
}
Help = config.HelpKVS{
config.HelpKV{
Key: Drives,
Description: `comma separated mountpoints e.g. "/optane1,/optane2"` + defaultHelpPostfix(Drives),
Type: "csv",
},
config.HelpKV{
Key: Expiry,
Description: `cache expiry duration in days` + defaultHelpPostfix(Expiry),
Optional: true,
Type: "number",
},
config.HelpKV{
Key: Quota,
Description: `limit cache drive usage in percentage` + defaultHelpPostfix(Quota),
Optional: true,
Type: "number",
},
config.HelpKV{
Key: Exclude,
Description: `exclude cache for following patterns e.g. "bucket/*.tmp,*.exe"` + defaultHelpPostfix(Exclude),
Optional: true,
Type: "csv",
},
config.HelpKV{
Key: After,
Description: `minimum number of access before caching an object` + defaultHelpPostfix(After),
Optional: true,
Type: "number",
},
config.HelpKV{
Key: WatermarkLow,
Description: `% of cache use at which to stop cache eviction` + defaultHelpPostfix(WatermarkLow),
Optional: true,
Type: "number",
},
config.HelpKV{
Key: WatermarkHigh,
Description: `% of cache use at which to start cache eviction` + defaultHelpPostfix(WatermarkHigh),
Optional: true,
Type: "number",
},
config.HelpKV{
Key: Range,
Description: `set to "on" or "off" caching of independent range requests per object` + defaultHelpPostfix(Range),
Optional: true,
Type: "string",
},
config.HelpKV{
Key: Commit,
Description: `set to control cache commit behavior` + defaultHelpPostfix(Commit),
Optional: true,
Type: "string",
},
config.HelpKV{
Key: config.Comment,
Description: config.DefaultComment,
Optional: true,
Type: "sentence",
},
}
)

View file

@ -1,55 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"fmt"
"strings"
"github.com/minio/minio/internal/config"
)
const (
cacheDelimiterLegacy = ";"
)
// SetCacheConfig - One time migration code needed, for migrating from older config to new for Cache.
func SetCacheConfig(s config.Config, cfg Config) {
if len(cfg.Drives) == 0 {
// Do not save cache if no settings available.
return
}
s[config.CacheSubSys][config.Default] = config.KVS{
config.KV{
Key: Drives,
Value: strings.Join(cfg.Drives, cacheDelimiter),
},
config.KV{
Key: Exclude,
Value: strings.Join(cfg.Exclude, cacheDelimiter),
},
config.KV{
Key: Expiry,
Value: fmt.Sprintf("%d", cfg.Expiry),
},
config.KV{
Key: Quota,
Value: fmt.Sprintf("%d", cfg.MaxUse),
},
}
}

View file

@ -1,232 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"errors"
"strconv"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/v2/env"
)
// Cache ENVs
const (
Drives = "drives"
Exclude = "exclude"
Expiry = "expiry"
MaxUse = "maxuse"
Quota = "quota"
After = "after"
WatermarkLow = "watermark_low"
WatermarkHigh = "watermark_high"
Range = "range"
Commit = "commit"
EnvCacheDrives = "MINIO_CACHE_DRIVES"
EnvCacheExclude = "MINIO_CACHE_EXCLUDE"
EnvCacheExpiry = "MINIO_CACHE_EXPIRY"
EnvCacheMaxUse = "MINIO_CACHE_MAXUSE"
EnvCacheQuota = "MINIO_CACHE_QUOTA"
EnvCacheAfter = "MINIO_CACHE_AFTER"
EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW"
EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH"
EnvCacheRange = "MINIO_CACHE_RANGE"
EnvCacheCommit = "MINIO_CACHE_COMMIT"
EnvCacheEncryptionKey = "MINIO_CACHE_ENCRYPTION_SECRET_KEY"
DefaultExpiry = "90"
DefaultQuota = "80"
DefaultAfter = "0"
DefaultWaterMarkLow = "70"
DefaultWaterMarkHigh = "80"
)
// DefaultKVS - default KV settings for caching.
var (
DefaultKVS = config.KVS{
config.KV{
Key: Drives,
Value: "",
},
config.KV{
Key: Exclude,
Value: "",
},
config.KV{
Key: Expiry,
Value: DefaultExpiry,
},
config.KV{
Key: Quota,
Value: DefaultQuota,
},
config.KV{
Key: After,
Value: DefaultAfter,
},
config.KV{
Key: WatermarkLow,
Value: DefaultWaterMarkLow,
},
config.KV{
Key: WatermarkHigh,
Value: DefaultWaterMarkHigh,
},
config.KV{
Key: Range,
Value: config.EnableOn,
},
config.KV{
Key: Commit,
Value: "",
},
}
)
const (
cacheDelimiter = ","
)
// Enabled returns if cache is enabled.
func Enabled(kvs config.KVS) bool {
drives := kvs.Get(Drives)
return drives != ""
}
// LookupConfig - extracts cache configuration provided by environment
// variables and merge them with provided CacheConfiguration.
func LookupConfig(kvs config.KVS) (Config, error) {
cfg := Config{}
if err := config.CheckValidKeys(config.CacheSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
drives := env.Get(EnvCacheDrives, kvs.Get(Drives))
if len(drives) == 0 {
return cfg, nil
}
var err error
cfg.Drives, err = parseCacheDrives(drives)
if err != nil {
return cfg, err
}
cfg.Enabled = true
if excludes := env.Get(EnvCacheExclude, kvs.Get(Exclude)); excludes != "" {
cfg.Exclude, err = parseCacheExcludes(excludes)
if err != nil {
return cfg, err
}
}
if expiryStr := env.Get(EnvCacheExpiry, kvs.Get(Expiry)); expiryStr != "" {
cfg.Expiry, err = strconv.Atoi(expiryStr)
if err != nil {
return cfg, config.ErrInvalidCacheExpiryValue(err)
}
}
if maxUseStr := env.Get(EnvCacheMaxUse, kvs.Get(MaxUse)); maxUseStr != "" {
cfg.MaxUse, err = strconv.Atoi(maxUseStr)
if err != nil {
return cfg, config.ErrInvalidCacheQuota(err)
}
// maxUse should be a valid percentage.
if cfg.MaxUse < 0 || cfg.MaxUse > 100 {
err := errors.New("config max use value should not be null or negative")
return cfg, config.ErrInvalidCacheQuota(err)
}
cfg.Quota = cfg.MaxUse
} else if quotaStr := env.Get(EnvCacheQuota, kvs.Get(Quota)); quotaStr != "" {
cfg.Quota, err = strconv.Atoi(quotaStr)
if err != nil {
return cfg, config.ErrInvalidCacheQuota(err)
}
// quota should be a valid percentage.
if cfg.Quota < 0 || cfg.Quota > 100 {
err := errors.New("config quota value should not be null or negative")
return cfg, config.ErrInvalidCacheQuota(err)
}
cfg.MaxUse = cfg.Quota
}
if afterStr := env.Get(EnvCacheAfter, kvs.Get(After)); afterStr != "" {
cfg.After, err = strconv.Atoi(afterStr)
if err != nil {
return cfg, config.ErrInvalidCacheAfter(err)
}
// after should be a valid value >= 0.
if cfg.After < 0 {
err := errors.New("cache after value cannot be less than 0")
return cfg, config.ErrInvalidCacheAfter(err)
}
}
if lowWMStr := env.Get(EnvCacheWatermarkLow, kvs.Get(WatermarkLow)); lowWMStr != "" {
cfg.WatermarkLow, err = strconv.Atoi(lowWMStr)
if err != nil {
return cfg, config.ErrInvalidCacheWatermarkLow(err)
}
// WatermarkLow should be a valid percentage.
if cfg.WatermarkLow < 0 || cfg.WatermarkLow > 100 {
err := errors.New("config min watermark value should be between 0 and 100")
return cfg, config.ErrInvalidCacheWatermarkLow(err)
}
}
if highWMStr := env.Get(EnvCacheWatermarkHigh, kvs.Get(WatermarkHigh)); highWMStr != "" {
cfg.WatermarkHigh, err = strconv.Atoi(highWMStr)
if err != nil {
return cfg, config.ErrInvalidCacheWatermarkHigh(err)
}
// MaxWatermark should be a valid percentage.
if cfg.WatermarkHigh < 0 || cfg.WatermarkHigh > 100 {
err := errors.New("config high watermark value should be between 0 and 100")
return cfg, config.ErrInvalidCacheWatermarkHigh(err)
}
}
if cfg.WatermarkLow > cfg.WatermarkHigh {
err := errors.New("config high watermark value should be greater than low watermark value")
return cfg, config.ErrInvalidCacheWatermarkHigh(err)
}
cfg.Range = true // by default range caching is enabled.
if rangeStr := env.Get(EnvCacheRange, kvs.Get(Range)); rangeStr != "" {
rng, err := config.ParseBool(rangeStr)
if err != nil {
return cfg, config.ErrInvalidCacheRange(err)
}
cfg.Range = rng
}
if commit := env.Get(EnvCacheCommit, kvs.Get(Commit)); commit != "" {
cfg.CacheCommitMode, err = parseCacheCommitMode(commit)
if err != nil {
return cfg, err
}
if cfg.After > 0 && cfg.CacheCommitMode != WriteThrough {
err := errors.New("cache after cannot be used with commit writeback")
return cfg, config.ErrInvalidCacheSetting(err)
}
}
return cfg, nil
}

View file

@ -61,66 +61,6 @@ var (
"WORM can only accept `on` and `off` values. To enable WORM, set this value to `on`",
)
ErrInvalidCacheDrivesValue = newErrFn(
"Invalid cache drive value",
"Please check the value in this ENV variable",
"MINIO_CACHE_DRIVES: Mounted drives or directories are delimited by `,`",
)
ErrInvalidCacheExcludesValue = newErrFn(
"Invalid cache excludes value",
"Please check the passed value",
"MINIO_CACHE_EXCLUDE: Cache exclusion patterns are delimited by `,`",
)
ErrInvalidCacheExpiryValue = newErrFn(
"Invalid cache expiry value",
"Please check the passed value",
"MINIO_CACHE_EXPIRY: Valid cache expiry duration must be in days",
)
ErrInvalidCacheQuota = newErrFn(
"Invalid cache quota value",
"Please check the passed value",
"MINIO_CACHE_QUOTA: Valid cache quota value must be between 0-100",
)
ErrInvalidCacheAfter = newErrFn(
"Invalid cache after value",
"Please check the passed value",
"MINIO_CACHE_AFTER: Valid cache after value must be 0 or greater",
)
ErrInvalidCacheWatermarkLow = newErrFn(
"Invalid cache low watermark value",
"Please check the passed value",
"MINIO_CACHE_WATERMARK_LOW: Valid cache low watermark value must be between 0-100",
)
ErrInvalidCacheWatermarkHigh = newErrFn(
"Invalid cache high watermark value",
"Please check the passed value",
"MINIO_CACHE_WATERMARK_HIGH: Valid cache high watermark value must be between 0-100",
)
ErrInvalidCacheRange = newErrFn(
"Invalid cache range value",
"Please check the passed value",
"MINIO_CACHE_RANGE: Valid expected value is `on` or `off`",
)
ErrInvalidCacheCommitValue = newErrFn(
"Invalid cache commit value",
"Please check the passed value",
"MINIO_CACHE_COMMIT: Valid expected value is `writeback` or `writethrough`",
)
ErrInvalidCacheSetting = newErrFn(
"Incompatible cache setting",
"Please check the passed value",
"MINIO_CACHE_AFTER cannot be used with MINIO_CACHE_COMMIT setting",
)
ErrInvalidConfigDecryptionKey = newErrFn(
"Incorrect encryption key to decrypt internal data",
"Please set the correct default KMS key value or the correct root credentials for older MinIO versions.",