fix: web handlers to enforce replication (#10249)
This PR also preserves source ETag for replication
This commit is contained in:
@@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
miniogo "github.com/minio/minio-go/v7"
|
||||
@@ -75,8 +76,23 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
|
||||
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
|
||||
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) bool {
|
||||
if permErr != ErrNone {
|
||||
return false
|
||||
}
|
||||
return mustReplicater(ctx, r, bucket, object, meta, replStatus)
|
||||
}
|
||||
|
||||
// mustReplicate returns true if object meets replication criteria.
|
||||
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
|
||||
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
|
||||
return false
|
||||
}
|
||||
return mustReplicater(ctx, r, bucket, object, meta, replStatus)
|
||||
}
|
||||
|
||||
// mustReplicater returns true if object meets replication criteria.
|
||||
func mustReplicater(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
|
||||
if globalIsGateway {
|
||||
return false
|
||||
}
|
||||
@@ -86,9 +102,6 @@ func mustReplicate(ctx context.Context, r *http.Request, bucket, object string,
|
||||
if replication.StatusType(replStatus) == replication.Replica {
|
||||
return false
|
||||
}
|
||||
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
|
||||
return false
|
||||
}
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
return false
|
||||
@@ -110,9 +123,11 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp
|
||||
if k == xhttp.AmzBucketReplicationStatus {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
||||
continue
|
||||
}
|
||||
meta[k] = v
|
||||
}
|
||||
|
||||
tag, err := tags.ParseObjectTags(objInfo.UserTags)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -130,6 +145,7 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp
|
||||
ReplicationVersionID: objInfo.VersionID,
|
||||
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
||||
ReplicationMTime: objInfo.ModTime,
|
||||
ReplicationETag: objInfo.ETag,
|
||||
}
|
||||
if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok {
|
||||
rmode := miniogo.RetentionMode(mode)
|
||||
@@ -219,3 +235,26 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
||||
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
||||
// Copy on write
|
||||
dst := metadata
|
||||
var copied bool
|
||||
delKey := func(key string) {
|
||||
if _, ok := metadata[key]; !ok {
|
||||
return
|
||||
}
|
||||
if !copied {
|
||||
dst = make(map[string]string, len(metadata))
|
||||
for k, v := range metadata {
|
||||
dst[k] = v
|
||||
}
|
||||
copied = true
|
||||
}
|
||||
delete(dst, key)
|
||||
}
|
||||
|
||||
delKey(xhttp.AmzBucketReplicationStatus)
|
||||
return dst
|
||||
}
|
||||
|
||||
@@ -90,11 +90,14 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
return BucketReplicationSourceNotVersioned{Bucket: bucket}
|
||||
}
|
||||
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
|
||||
if err != nil || vcfg.Status != string(versioning.Enabled) {
|
||||
if err != nil {
|
||||
if isErrBucketNotFound(err) {
|
||||
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
return BucketReplicationTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
if vcfg.Status != string(versioning.Enabled) {
|
||||
return BucketReplicationTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -702,6 +702,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
|
||||
// Save successfully calculated md5sum.
|
||||
fi.Metadata["etag"] = s3MD5
|
||||
if opts.UserDefined["etag"] != "" { // preserve ETag if set
|
||||
fi.Metadata["etag"] = opts.UserDefined["etag"]
|
||||
}
|
||||
|
||||
// Save the consolidated actual size.
|
||||
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
||||
|
||||
@@ -710,8 +710,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
Hash: bitrotWriterSum(w),
|
||||
})
|
||||
}
|
||||
|
||||
opts.UserDefined["etag"] = r.MD5CurrentHexString()
|
||||
if opts.UserDefined["etag"] == "" {
|
||||
opts.UserDefined["etag"] = r.MD5CurrentHexString()
|
||||
}
|
||||
|
||||
// Guess content-type from the extension if possible.
|
||||
if opts.UserDefined["content-type"] == "" {
|
||||
|
||||
@@ -123,6 +123,9 @@ const (
|
||||
|
||||
// Header indicates if the mtime should be preserved by client
|
||||
MinIOSourceMTime = "x-minio-source-mtime"
|
||||
|
||||
// Header indicates if the etag should be preserved by client
|
||||
MinIOSourceETag = "x-minio-source-etag"
|
||||
)
|
||||
|
||||
// Common http query params S3 API
|
||||
|
||||
@@ -164,7 +164,13 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
|
||||
} else {
|
||||
opts.MTime = UTCNow()
|
||||
}
|
||||
|
||||
etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag))
|
||||
if etag != "" {
|
||||
if metadata == nil {
|
||||
metadata = make(map[string]string)
|
||||
}
|
||||
metadata["etag"] = etag
|
||||
}
|
||||
// In the case of multipart custom format, the metadata needs to be checked in addition to header to see if it
|
||||
// is SSE-S3 encrypted, primarily because S3 protocol does not require SSE-S3 headers in PutObjectPart calls
|
||||
if GlobalGatewaySSE.SSES3() && (crypto.S3.IsRequested(r.Header) || crypto.S3.IsEncrypted(metadata)) {
|
||||
|
||||
@@ -1155,6 +1155,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
if objTags != "" {
|
||||
srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
|
||||
}
|
||||
srcInfo.UserDefined = filterReplicationStatusMetadata(srcInfo.UserDefined)
|
||||
|
||||
srcInfo.UserDefined = objectlock.FilterObjectLockMetadata(srcInfo.UserDefined, true, true)
|
||||
retPerms := isPutActionAllowed(getRequestAuthType(r), dstBucket, dstObject, r, iampolicy.PutObjectRetentionAction)
|
||||
holdPerms := isPutActionAllowed(getRequestAuthType(r), dstBucket, dstObject, r, iampolicy.PutObjectLegalHoldAction)
|
||||
@@ -1259,7 +1261,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
|
||||
encodedSuccessResponse := encodeResponse(response)
|
||||
if mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
|
||||
defer replicateObject(ctx, dstBucket, dstObject, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
defer replicateObject(context.Background(), dstBucket, dstObject, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
EventName: event.ObjectCreatedCopy,
|
||||
BucketName: dstBucket,
|
||||
Object: objInfo,
|
||||
@@ -1575,7 +1577,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
if mustReplicate(ctx, r, bucket, object, metadata, "") {
|
||||
defer replicateObject(ctx, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
defer replicateObject(context.Background(), bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
EventName: event.ObjectCreatedPut,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
@@ -2650,7 +2652,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
|
||||
setPutObjHeaders(w, objInfo, false)
|
||||
if mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
|
||||
defer replicateObject(ctx, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
defer replicateObject(context.Background(), bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
EventName: event.ObjectCreatedCompleteMultipartUpload,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
|
||||
+28
-1
@@ -48,6 +48,7 @@ import (
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
|
||||
"github.com/minio/minio/pkg/bucket/policy"
|
||||
"github.com/minio/minio/pkg/bucket/replication"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
"github.com/minio/minio/pkg/handlers"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
@@ -961,6 +962,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
retPerms := ErrAccessDenied
|
||||
holdPerms := ErrAccessDenied
|
||||
replPerms := ErrAccessDenied
|
||||
if authErr != nil {
|
||||
if authErr == errNoAuthToken {
|
||||
// Check if anonymous (non-owner) has access to upload objects.
|
||||
@@ -1016,6 +1018,17 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
}) {
|
||||
holdPerms = ErrNone
|
||||
}
|
||||
if globalIAMSys.IsAllowed(iampolicy.Args{
|
||||
AccountName: claims.AccessKey,
|
||||
Action: iampolicy.GetReplicationConfigurationAction,
|
||||
BucketName: bucket,
|
||||
ConditionValues: getConditionValues(r, "", claims.AccessKey, claims.Map()),
|
||||
IsOwner: owner,
|
||||
ObjectName: object,
|
||||
Claims: claims.Map(),
|
||||
}) {
|
||||
replPerms = ErrNone
|
||||
}
|
||||
}
|
||||
|
||||
// Check if bucket is a reserved bucket name or invalid.
|
||||
@@ -1082,6 +1095,10 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
mustReplicate := mustReplicateWeb(ctx, r, bucket, object, metadata, "", replPerms)
|
||||
if mustReplicate {
|
||||
metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending)
|
||||
}
|
||||
pReader = NewPutObjReader(hashReader, nil, nil)
|
||||
// get gateway encryption options
|
||||
opts, err := putOpts(ctx, r, bucket, object, metadata)
|
||||
@@ -1155,7 +1172,17 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if mustReplicate {
|
||||
defer replicateObject(context.Background(), bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
|
||||
EventName: event.ObjectCreatedPut,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
ReqParams: extractReqParams(r),
|
||||
RespElements: extractRespElements(w),
|
||||
UserAgent: r.UserAgent(),
|
||||
Host: handlers.GetSourceIP(r),
|
||||
}, false)
|
||||
}
|
||||
// Notify object created event.
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectCreatedPut,
|
||||
|
||||
Reference in New Issue
Block a user