irods / irods_capability_storage_tiering

BSD 3-Clause "New" or "Revised" License
5 stars 10 forks source link

implement trimming of objects in tier0 resource when preserve_replicas is set to true #253

Open cookie33 opened 6 months ago

cookie33 commented 6 months ago

FEATURE REQUEST

VERSIONS

iRODS 4.3.1

proposed BEHAVIOUR

A system with tier 0 and tier 1. The parameter irods::storage_tiering::preserve_replicas is set to true for all tiers.

A new parameter irods::storage_tiering::preserve_replicas_size is set to the number of bytes to keep on tier0

When the storage tiering rule fires of it also checks for the object size in the tier0. If it is too much it trims files based on date (oldest first) till it reaches the limit. This might also be configarable like the query to find files to migrate. It only does it for files which are in both tiers. During the operation it also updates the attribute irods::storage_tiering::group with a new unit if it is needed

An example of how we now implement it in a python iRODS rule

def py_execute_itrim_on_cache_objects(rule_args, callback, rei):
    """perform execution of itrim on cache objects because of space constraints
       input:
                   interactive
                   zoneName
                   CacheResc
                   archiveResc
                   maxSpaceAllowed
       output: -      
     """
    (interactive, zoneName, cacheResc, archiveResc, maxSpaceAllowed) = rule_args
    MaxSpAlwd = int(maxSpaceAllowed)
    usedSpace = int(0)
    zone = "/" + str(zoneName) + "%"
    callback.writeLine("serverLog","Start  : The collection [{0}] in cache resource [{1}] is being trimmed to [{2}] bytes".format(zone, cacheResc, str(MaxSpAlwd)))
    condition = "DATA_RESC_HIER = '" + cacheResc + "' AND COLL_NAME like '" + zone + "'"
    # query for row of objects in the cache resource
    for row in row_iterator(
            ["sum(DATA_SIZE)"],        # requested columns
            condition,                 # condition for query
            AS_LIST,                   # retrieve as list structure
            callback):
        if row[0] != '':
            usedSpace = int(row[0])
    callback.writeLine("serverLog","The cache resource [{0}] has [{1}] bytes".format(cacheResc, str(usedSpace)))
    # check if used more space than allowed
    if usedSpace > MaxSpAlwd:
        loop_must_break = False
        callback.writeLine("serverLog","The used space [{0}] > max allowed space [{1}]".format(str(usedSpace), str(MaxSpAlwd)))
        condition = "DATA_RESC_HIER = '" + cacheResc + "' AND COLL_NAME like '" + zone + "'"
        # query for page of objects in the cache resource
        for page in paged_iterator(
                ["DATA_NAME", "COLL_NAME", "DATA_REPL_NUM", "DATA_SIZE", "order(DATA_CREATE_TIME)"],        # requested columns
                condition,                 # condition for query
                AS_LIST,                   # retrieve as list structure
                callback):
            # loop over rows in page of objects in the cache resource
            for row in page:
                data_name = row[0]
                coll_name = row[1]
                data_repl_num = row[2]
                data_size = int(row[3])
                object = coll_name + "/" + data_name
                condition = "DATA_RESC_HIER = '" + archiveResc + "' AND COLL_NAME like '" + coll_name + "' AND DATA_NAME = '" + data_name + "' AND DATA_REPL_STATUS = '1'"
                # loop over rows of objects in the archive resource
                for row in row_iterator(
                        ["DATA_NAME", "COLL_NAME", "DATA_REPL_NUM", "DATA_SIZE"],        # requested columns
                        condition,                 # condition for query
                        AS_LIST,                   # retrieve as list structure
                        callback):
                    callback.writeLine("serverLog","[{0}] with data size [{1}] and replica number [{2}] on [{3}] is to be purged".format(object, str(data_size), data_repl_num, cacheResc))
                    status = 0
                    # trim an object in cache
                    try:
                        ret_val = callback.msiDataObjTrim(object, 'null', data_repl_num, '1', 'ADMIN_KW', status)
                        callback.writeLine('serverLog',"Trimmed object [{0}] replica number [{1}] from [{2}]".format(object, data_repl_num, cacheResc))
                        usedSpace -= data_size
                        if usedSpace < MaxSpAlwd:
                            loop_must_break = True
                    except:
                        callback.writeLine('serverLog',"Could not trim object [{0}] replica number [{1}] from [{2}]".format(object, data_repl_num, cacheResc))
                        #return irods_errors.ACTION_FAILED_ERR

                    # find value and units of irods::storage_tiering::group
                    avu_found = False
                    avu_value = ''
                    avu_units = ''
                    try:
                        condition = "DATA_NAME = '" + row[0] + "' AND COLL_NAME = '" + row[1] + "'"
                        # query for row of objects in the cache resource
                        for meta_row in row_iterator(
                            ["META_DATA_ATTR_NAME", "META_DATA_ATTR_VALUE", "META_DATA_ATTR_UNITS"],        # requested columns
                            condition,                 # condition for query
                            AS_LIST,                   # retrieve as list structure
                            callback):
                            ##callback.writeLine("serverLog","Object [{0}] has avu [{1}] with value [{2}]".format(object, meta_row[0], meta_row[1]))
                            # if the avu irods::storage_tiering::group is found set the boolean to true
                            if meta_row[0] == "irods::storage_tiering::group":
                                avu_found = True
                                avu_value = meta_row[1]
                                avu_units = meta_row[2]
                        ##callback.writeLine("serverLog","Object [{0}] has avu [irods::storage_tiering::group] with value [{1}] and unit [{2}]".format(object, str(avu_value), str(avu_units)))
                    except:
                        callback.writeLine('serverLog',"Could not get avu [irods::storage_tiering::group] for object [{0}]".format(object))
                        #return irods_errors.ACTION_FAILED_ERR

                    # update value and units of irods::storage_tiering::group
                    if avu_found and int(data_repl_num) >= int(avu_units):
                        avu_new_units = int(avu_units) - 1
                        json_input = '{ "admin_mode": true, "entity_name": "' + object + '", "entity_type": "data_object", "operations": [ { "operation": "remove", "attribute": "irods::storage_tiering::group", "value": "' + avu_value + '", "units": "' + avu_units + '" }, { "operation": "add", "attribute": "irods::storage_tiering::group", "value": "' + avu_value + '", "units": "' + str(avu_new_units) + '" } ] }'
                        json_output = ''
                        try:
                            ret_val = callback.msi_atomic_apply_metadata_operations(json_input, json_output)
                            callback.writeLine('serverLog','Updated avu [irods::storage_tiering::group] on object [{0}]'.format(object))
                        except:
                            callback.writeLine('serverLog','Could not update avu [irods::storage_tiering::group] on object [{0}] with error [{1}]'.format(object, str(json_output)))
                            #return irods_errors.ACTION_FAILED_ERR

                    if loop_must_break: break

                if loop_must_break: break

            if loop_must_break: break

    callback.writeLine("serverLog","End    : The collection [{0}] in cache resource [{1}] has been trimmed to [{2}] bytes".format(zone, cacheResc, str(MaxSpAlwd) ))
trel commented 6 months ago

It only does it for files which are in both tiers.

What should happen if there are three or more tiers? Would your sentence read ... 'all tiers'?

trel commented 6 months ago

So this policy would remove the oldest duplicated files, in order, if they are larger than a certain size, and until a threshold value is met?

Policy for the ordering could be customizable - or just part of an enum... ['oldest', 'largest', 'smallest']

cookie33 commented 6 months ago

It only does it for files which are in both tiers.

What should happen if there are three or more tiers? Would your sentence read ... 'all tiers'?

I would say if it is in more than one tier. If it is in more than one tier it can do a trim of the resource if it is on a other resource. f.i. if you have three tiers: Tier0, Tier1, Tier2 and a file: file_A. file_A has 2 copies file_A ==> Tier0 file_A ==> Tier2 So it can trim it from Tier0 because there is a copy on Tier2 and a metadata AVU which says so on resource Tier0

cookie33 commented 6 months ago

So this policy would remove the oldest duplicated files, in order, if they are larger than a certain size, and until a threshold value is met?

Policy for the ordering could be customizable - or just part of an enum... ['oldest', 'largest', 'smallest']

For now my rule as implemented just does following:

trel commented 6 months ago

Got it. But a 'minimum filesize to trim' might also be a handy knob to have.