darshan-hpc / darshan

Darshan I/O characterization tool
Other
56 stars 27 forks source link

PyDarshan: Easily extract OST related information from DXT logs #763

Open jeanbez opened 2 years ago

jeanbez commented 2 years ago

I was looking at the documentation/code, and I do not seem to be able to get the OST information for each access collected by the DXT module using PyDarshan, similarly to what is presented when using the command line parser.

After talking with Share, I understood that this information could be computed based on the stripe width and size but it might be good to have an easier way to extract that without having the end-user manually compute the OST ID by themselves.

jakobluettgau commented 2 years ago

Hi Jean Luca, so the reason this is not in PyDarshan right now is to maintain some control over fast loading and memory usage for the default case. I do have some snippets from another project you could borrow that augment the information onto DXT records, also we are looking to curate a collection of utilities of similar nature. I would be curious to maybe have a chat about what would make sense for an API especially with interactive tools in mind ;)

Example:

r = darshan.DarshanReport(logs[0], read_all=True)
lustre_records_by_id = get_id_to_record_mapping(r, "LUSTRE")

# e.g., on demand:
rec = dxt_record_attach_osts_inplace(r, r.records["DXT_MPIIO"][0], lustre_records_by_id)
print(rec)

# or if you want to apply it to all just make sure to try..catch in case you have files in your log that do not live in a lustre namespace
def graceful_wrapper(r, rec, lustre_records_by_id):
    try:
        dxt_record_attach_osts_inplace(r, rec, lustre_records_by_id)
    except:
        pass

list(map(lambda rec: graceful_wrapper(r, rec, lustre_records_by_id), r.records['DXT_POSIX'])) # list(map(...)) ensures function is applied to all
list(map(lambda rec: graceful_wrapper(r, rec, lustre_records_by_id), r.records['DXT_MPIIO']))
print(r.records["DXT_MPIIO"][1])

Helpers needed for the above snippet to work:

def get_id_to_record_mapping(report, mod):
    """ 
    Get mapping of id to records for a given module.

    Arguments:
        report (DarshanReport): the the record belongs to
        mod (String): name of module

    Returns:
        Dictionary with lists of records by their id
    """    
    if mod not in report.records:
        raise Exception(f"No records loaded for module: {mod}")

    recs_by_id = {}
    for i, rec in enumerate(report.records[mod]):
        name = report.name_records[rec['id']]

        # add entry for record
        if rec['id'] not in recs_by_id:
            recs_by_id[rec['id']] = []

        if mod in ["LUSTRE"]:
            # in case of, for example, lustre, multiple records are usually not meaningful by rank
            # so only a single entry is preserved (only if the file would be deleted and recreated, stripe settings would change)
            recs_by_id[rec['id']] = [rec]
        else:
            recs_by_id[rec['id']].append(rec)

    return recs_by_id

def dxt_record_attach_osts_inplace(report, record, lustre_records_by_id):
    """ 
    For a given DXT record, attach targeted Lustre OSTs.

    Arguments:
        report: DarshanReport the the record belongs to
        record: DarshanRecord to update
        lustre_records_by_id: mapping to use (recreating the mapping is potentially expensive for larger logs)

    Returns:
        reference to updated record
    """

    rec = record

    if rec['id'] not in lustre_records_by_id:
        raise Exception(f"No matching lustre records found. (This is not necessarily an error, just that the file may not reside on a Lustre system.)")

    lrec = lustre_records_by_id[rec['id']][0]
    lcounters = dict(zip(report.counters['LUSTRE']['counters'], lrec['counters']))

    osts = list(lrec['ost_ids'])

    stripe_size = lcounters["LUSTRE_STRIPE_SIZE"]
    stripe_count = lcounters["LUSTRE_STRIPE_WIDTH"]

    for op in ['read_segments', 'write_segments']:
        segs = rec[op]

        for access in segs:
            # resetting 'osts' ensures that the operation is idempotent 
            # (even if applied multiple times to a record the integrity of the osts field remains)
            access['osts'] = []

            offset = access['offset']
            length =  access['length']

            # first ost
            cur_offset = offset
            ost_idx = int(offset / stripe_size) % stripe_count

            add_count = 0
            while cur_offset <= (offset + length):
                # add last ost_id
                ost_id = osts[ost_idx]
                access['osts'].append(ost_id)

                # determine next
                cur_offset = (int(cur_offset / stripe_size) +  1) * stripe_size

                if ost_idx == (stripe_count - 1):
                    ost_idx = 0
                else:
                    ost_idx += 1

                add_count += 1
                if add_count >= stripe_count:
                    break

    return rec