remove double reads delete versions (#13544)

deleting collection of versions belonging
to same object, we can avoid re-reading
the xl.meta from the disk instead purge
all the requested versions in-memory,

the tradeoff is to allocate a map to de-dup
the versions, allow disks to be read only
once per object.

additionally reduce the data transfer between
nodes by shortening msgp data values.
This commit is contained in:
Harshavardhana
2021-11-01 10:50:07 -07:00
committed by GitHub
parent 15dcacc1fc
commit bb639d9f29
12 changed files with 310 additions and 259 deletions
+72 -49
View File
@@ -1088,37 +1088,64 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
writeQuorums[i] = getWriteQuorum(len(storageDisks))
}
versions := make([]FileInfo, len(objects))
versionsMap := make(map[string]FileInfoVersions, len(objects))
for i := range objects {
if objects[i].VersionID == "" {
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
uuid := opts.VersionID
if uuid == "" {
uuid = mustGetUUID()
}
if opts.Versioned || opts.VersionSuspended {
versions[i] = FileInfo{
Name: objects[i].ObjectName,
ModTime: modTime,
Deleted: true, // delete marker
ReplicationState: objects[i].ReplicationState(),
}
versions[i].SetTierFreeVersionID(mustGetUUID())
if opts.Versioned {
versions[i].VersionID = uuid
}
continue
}
}
versions[i] = FileInfo{
// Construct the FileInfo data that needs to be preserved on the disk.
vr := FileInfo{
Name: objects[i].ObjectName,
VersionID: objects[i].VersionID,
ReplicationState: objects[i].ReplicationState(),
// save the index to set correct error at this index.
Idx: i,
}
versions[i].SetTierFreeVersionID(mustGetUUID())
vr.SetTierFreeVersionID(mustGetUUID())
// VersionID is not set means delete is not specific about
// any version, look for if the bucket is versioned or not.
if objects[i].VersionID == "" {
if opts.Versioned || opts.VersionSuspended {
// Bucket is versioned and no version was explicitly
// mentioned for deletes, create a delete marker instead.
vr.ModTime = UTCNow()
vr.Deleted = true
// Versioning suspended means that we add a `null` version
// delete marker, if not add a new version for this delete
// marker.
if opts.Versioned {
vr.VersionID = mustGetUUID()
}
}
}
// De-dup same object name to collect multiple versions for same object.
v, ok := versionsMap[objects[i].ObjectName]
if ok {
v.Versions = append(v.Versions, vr)
} else {
v = FileInfoVersions{
Name: vr.Name,
Versions: []FileInfo{vr},
}
}
if vr.Deleted {
dobjects[i] = DeletedObject{
DeleteMarker: vr.Deleted,
DeleteMarkerVersionID: vr.VersionID,
DeleteMarkerMTime: DeleteMarkerMTime{vr.ModTime},
ObjectName: vr.Name,
ReplicationState: vr.ReplicationState,
}
} else {
dobjects[i] = DeletedObject{
ObjectName: vr.Name,
VersionID: vr.VersionID,
ReplicationState: vr.ReplicationState,
}
}
versionsMap[objects[i].ObjectName] = v
}
dedupVersions := make([]FileInfoVersions, 0, len(versionsMap))
for _, version := range versionsMap {
dedupVersions = append(dedupVersions, version)
}
// Initialize list of errors.
@@ -1130,17 +1157,24 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
delObjErrs[index] = make([]error, len(objects))
if disk == nil {
delObjErrs[index] = make([]error, len(versions))
for i := range versions {
for i := range objects {
delObjErrs[index][i] = errDiskNotFound
}
return
}
delObjErrs[index] = disk.DeleteVersions(ctx, bucket, versions)
errs := disk.DeleteVersions(ctx, bucket, dedupVersions)
for i, err := range errs {
if err == nil {
continue
}
for _, v := range dedupVersions[i].Versions {
delObjErrs[index][v.Idx] = err
}
}
}(index, disk)
}
wg.Wait()
// Reduce errors for each object
@@ -1162,28 +1196,17 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
}
if errs[objIndex] == nil {
NSUpdated(bucket, objects[objIndex].ObjectName)
}
if versions[objIndex].Deleted {
dobjects[objIndex] = DeletedObject{
DeleteMarker: versions[objIndex].Deleted,
DeleteMarkerVersionID: versions[objIndex].VersionID,
DeleteMarkerMTime: DeleteMarkerMTime{versions[objIndex].ModTime},
ObjectName: versions[objIndex].Name,
ReplicationState: versions[objIndex].ReplicationState,
}
} else {
dobjects[objIndex] = DeletedObject{
ObjectName: versions[objIndex].Name,
VersionID: versions[objIndex].VersionID,
ReplicationState: versions[objIndex].ReplicationState,
}
defer NSUpdated(bucket, objects[objIndex].ObjectName)
}
}
// Check failed deletes across multiple objects
for _, version := range versions {
for i, dobj := range dobjects {
// This object errored, no need to attempt a heal.
if errs[i] != nil {
continue
}
// Check if there is any offline disk and add it to the MRF list
for _, disk := range storageDisks {
if disk != nil && disk.IsOnline() {
@@ -1193,7 +1216,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
// all other direct versionId references we should
// ensure no dangling file is left over.
er.addPartial(bucket, version.Name, version.VersionID, -1)
er.addPartial(bucket, dobj.ObjectName, dobj.VersionID, -1)
break
}
}