journeymidnight / yig

An open source object storage server with Amazon S3 compatible API
Apache License 2.0
361 stars 86 forks source link

Fix versioning. Remove objMap. #243

Open brucen1030 opened 4 years ago

brucen1030 commented 4 years ago

Tested with test/versioning.py Objects in versioned buckets are not appendable.

Versioning is handled as follows.

  1. In PUT (new) object, version in db table is set as follows in meta/types/object.go.

    func (o *Object) GetCreateSql() (string, []interface{}, uint64) {
    version := math.MaxUint64 - uint64(o.LastModifiedTime.UnixNano())

    And responsed to client with converted versionId in meta/client/tidbclient/object.go

    func (t *TidbClient) PutObject(object *Object, tx DB) (err error) {
    if tx == nil {
        tx, err = t.Client.Begin()
        if err != nil {
            return err
        }
        defer func() {
            if err == nil {
                err = tx.(*sql.Tx).Commit()
            }
            if err != nil {
                tx.(*sql.Tx).Rollback()
            }
        }()
    }
    sql, args, iversion := object.GetCreateSql()
    object.VersionId = ConvertRawVersionToS3Version(iversion)
  2. In ListObjectVersions, object versions will be returned in meta/client/tidbclient/bucket.go and meta/client/tidbclient/object.go.

    
    func (t *TidbClient) GetObject(bucketName, objectName, version string) (object *Object, err error) {
    var ibucketname, iname, customattributes, acl, lastModifiedTime string
    var iversion uint64
    
    var row *sql.Row
    sqltext := "select bucketname,name,version,location,pool,ownerid,size,objectid,lastmodifiedtime,etag,contenttype," +
        "customattributes,acl,nullversion,deletemarker,ssetype,encryptionkey,initializationvector,type,storageclass from objects where bucketname=? and name=? "
    if version == "" {
        sqltext += "order by bucketname,name,version limit 1;"
        row = t.Client.QueryRow(sqltext, bucketName, objectName)
    } else if version == ObjectNullVersion {
        sqltext += "and nullversion=1 limit 1;" // There should be only one NullVersion object.
        row = t.Client.QueryRow(sqltext, bucketName, objectName)
    } else {
        sqltext += "and version=?;"
        internalVersion, err := ConvertS3VersionToRawVersion(version)
        if err != nil {
            return nil, ErrInternalError
        }
        row = t.Client.QueryRow(sqltext, bucketName, objectName, internalVersion)
    }
    ...
    object.VersionId = ConvertRawVersionToS3Version(iversion)
    
    return
    }

3. In GET object, objectid for ceph will be get with client specified version Id.
4. In PUT objects in versioned buckets in storage/object.go, old objects will be handled in storage/object.go.

func (yig *YigStorage) checkOldObject(bucketName, objectName, versioning string) (err error) { switch versioning { case meta.VersionDisabled: // Delete all the bucketName + objectName. return yig.removeAllObjectsEntryByName(bucketName, objectName) // TODO: not deleted.

case meta.VersionEnabled:
    // Do nothing.

case meta.VersionSuspended:
    // Delete null version object.
    var object *meta.Object
    if object, err = yig.MetaStorage.GetObjectVersion(bucketName, objectName, meta.ObjectNullVersion, false); err != nil {
        if err == ErrNoSuchKey {
            return nil
        }
        return err
    }

    return yig.MetaStorage.DeleteObject(object, object.DeleteMarker)

default:
    return errors.New("No Such versioning status!")
}

return nil

}


TODO work
1. Lifecycle and versioning adaption. 
https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html#intro-lifecycle-rules-actions