Allow optionally to disable range caching. (#9908)
The default behavior is to cache each range requested to cache drive. Add an environment variable `MINIO_RANGE_CACHE` - when set to off, it disables range caching and instead downloads entire object in the background. Fixes #9870
This commit is contained in:
Vendored
+1
@@ -37,6 +37,7 @@ type Config struct {
|
||||
After int `json:"after"`
|
||||
WatermarkLow int `json:"watermark_low"`
|
||||
WatermarkHigh int `json:"watermark_high"`
|
||||
Range bool `json:"range"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON - implements JSON unmarshal interface for unmarshalling
|
||||
|
||||
Vendored
+6
@@ -68,5 +68,11 @@ var (
|
||||
Optional: true,
|
||||
Type: "number",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: Range,
|
||||
Description: `set to "on" or "off" caching of independent range requests per object, defaults to "on"`,
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
Vendored
+16
@@ -34,6 +34,7 @@ const (
|
||||
After = "after"
|
||||
WatermarkLow = "watermark_low"
|
||||
WatermarkHigh = "watermark_high"
|
||||
Range = "range"
|
||||
|
||||
EnvCacheDrives = "MINIO_CACHE_DRIVES"
|
||||
EnvCacheExclude = "MINIO_CACHE_EXCLUDE"
|
||||
@@ -43,6 +44,7 @@ const (
|
||||
EnvCacheAfter = "MINIO_CACHE_AFTER"
|
||||
EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW"
|
||||
EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH"
|
||||
EnvCacheRange = "MINIO_CACHE_RANGE"
|
||||
|
||||
EnvCacheEncryptionMasterKey = "MINIO_CACHE_ENCRYPTION_MASTER_KEY"
|
||||
|
||||
@@ -84,6 +86,10 @@ var (
|
||||
Key: WatermarkHigh,
|
||||
Value: DefaultWaterMarkHigh,
|
||||
},
|
||||
config.KV{
|
||||
Key: Range,
|
||||
Value: config.EnableOn,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -195,5 +201,15 @@ func LookupConfig(kvs config.KVS) (Config, error) {
|
||||
err := errors.New("config high watermark value should be greater than low watermark value")
|
||||
return cfg, config.ErrInvalidCacheWatermarkHigh(err)
|
||||
}
|
||||
|
||||
cfg.Range = true // by default range caching is enabled.
|
||||
if rangeStr := env.Get(EnvCacheRange, kvs.Get(Range)); rangeStr != "" {
|
||||
rng, err := config.ParseBool(rangeStr)
|
||||
if err != nil {
|
||||
return cfg, config.ErrInvalidCacheRange(err)
|
||||
}
|
||||
cfg.Range = rng
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
@@ -96,6 +96,12 @@ var (
|
||||
"MINIO_CACHE_ENCRYPTION_MASTER_KEY: For more information, please refer to https://docs.min.io/docs/minio-disk-cache-guide",
|
||||
)
|
||||
|
||||
ErrInvalidCacheRange = newErrFn(
|
||||
"Invalid cache range value",
|
||||
"Please check the passed value",
|
||||
"MINIO_CACHE_RANGE: Valid expected value is `on` or `off`",
|
||||
)
|
||||
|
||||
ErrInvalidRotatingCredentialsBackendEncrypted = newErrFn(
|
||||
"Invalid rotating credentials",
|
||||
"Please set correct rotating credentials in the environment for decryption",
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/djherbis/atime"
|
||||
"github.com/minio/minio/cmd/config/cache"
|
||||
"github.com/minio/minio/cmd/crypto"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
@@ -137,6 +138,7 @@ type diskCache struct {
|
||||
after int // minimum accesses before an object is cached.
|
||||
lowWatermark int
|
||||
highWatermark int
|
||||
enableRange bool
|
||||
// nsMutex namespace lock
|
||||
nsMutex *nsLockMap
|
||||
// Object functions pointing to the corresponding functions of backend implementation.
|
||||
@@ -144,7 +146,12 @@ type diskCache struct {
|
||||
}
|
||||
|
||||
// Inits the disk cache dir if it is not initialized already.
|
||||
func newDiskCache(ctx context.Context, dir string, quotaPct, after, lowWatermark, highWatermark int) (*diskCache, error) {
|
||||
func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCache, error) {
|
||||
quotaPct := config.MaxUse
|
||||
if quotaPct == 0 {
|
||||
quotaPct = config.Quota
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err)
|
||||
}
|
||||
@@ -153,9 +160,10 @@ func newDiskCache(ctx context.Context, dir string, quotaPct, after, lowWatermark
|
||||
triggerGC: make(chan struct{}),
|
||||
stats: CacheDiskStats{Dir: dir},
|
||||
quotaPct: quotaPct,
|
||||
after: after,
|
||||
lowWatermark: lowWatermark,
|
||||
highWatermark: highWatermark,
|
||||
after: config.After,
|
||||
lowWatermark: config.WatermarkLow,
|
||||
highWatermark: config.WatermarkHigh,
|
||||
enableRange: config.Range,
|
||||
online: 1,
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
|
||||
+8
-6
@@ -312,6 +312,10 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
|
||||
if rs != nil {
|
||||
go func() {
|
||||
// if range caching is disabled, download entire object.
|
||||
if !dcache.enableRange {
|
||||
rs = nil
|
||||
}
|
||||
// fill cache in the background for range GET requests
|
||||
bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
if bErr != nil {
|
||||
@@ -321,7 +325,9 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
oi, _, _, err := dcache.statRange(ctx, bucket, object, rs)
|
||||
// avoid cache overwrite if another background routine filled cache
|
||||
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
|
||||
dcache.Put(ctx, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false)
|
||||
// use a new context to avoid locker prematurely timing out operation when the GetObjectNInfo returns.
|
||||
dcache.Put(context.Background(), bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false)
|
||||
return
|
||||
}
|
||||
}()
|
||||
return bkReader, bkErr
|
||||
@@ -542,11 +548,7 @@ func newCache(config cache.Config) ([]*diskCache, bool, error) {
|
||||
return nil, false, errors.New("Atime support required for disk caching")
|
||||
}
|
||||
|
||||
quota := config.MaxUse
|
||||
if quota == 0 {
|
||||
quota = config.Quota
|
||||
}
|
||||
cache, err := newDiskCache(ctx, dir, quota, config.After, config.WatermarkLow, config.WatermarkHigh)
|
||||
cache, err := newDiskCache(ctx, dir, config)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user