ovis-hpc / ldms

OVIS/LDMS High Performance Computing monitoring, analysis, and visualization project.
https://github.com/ovis-hpc/ovis-wiki/wiki
Other
99 stars 51 forks source link

hello_streams example - extracting the data? #239

Closed oceandlr closed 4 years ago

oceandlr commented 4 years ago

I have used the hello_sampler and hello_publisher. I see the stream info being written to the log. How else can I use this? Should this look like a metric set? Can it be extracted by a store?

@tom95858 @valleydlr

tom95858 commented 4 years ago

Hi @oceandlr, once you have created the JSON string that represents your data, you can send it to ldmsd in a stream like so:

rc = ldmsd_stream_publish(
                xprt,
                "my stream name",
                LDMSD_STREAM_JSON,
                my_json_formatted_buffer,
                strlen(my_json_formatted_buffer));

When it arrives at ldmsd it is parsed by the ldmsd_streams interface and passed to any plugin that has 'subscribed' to it as a json_entity_t. From there the plugin can use the json API to find attributes, iterate through lists, etc...

tom95858 commented 4 years ago

Hi @oceandlr,

Here is a program that will translate an empire log file to json. The resulting output can then be fed to the ldmsd_stream_publish command to send it to ldmsd

Here's an example


translate_empire --file empire_timing.log | ldmsd_stream_publish \
       -x sock -h localhost -p 411 -a munge \
       --type json -s empire

Note that I had to guess at the meaning of some of the data in the log file.

Here is the program...

#!/usr/bin/env python3
import json
import argparse
import re

class Parser(object):
    def __init__(self, fmt='json'):
        self.stack = {}
        self.fmt = fmt
        self.expr = r'(.*: )(\S+)(\s*-\s*[^%]+%)?\s*(\[(\d+)\]\s*{min=[^,]+,\s*max=([^,]+),\s*std dev=([^}]+)}\s*<(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+)>)?\s*'
        self.regex = re.compile(self.expr)

    def print_stack(self, depth):
        s = self.stack[0]
        for f in range(1, depth+1):
            s += "->" + self.stack[f]
        # print(s)

    def parse(self, line, sep):
        m = self.regex.match(line)
        if m is None:
            print("Syntax Error! ", end='')
            print(line)
            return
        g = m.groups()

        fname = g[0]
        bars = fname.split('|')
        bars = [ e.strip() for e in bars ]
        for depth in range(0, len(bars)):
            if len(bars[depth]) != 0:
                break
        fname = bars[depth]
        self.stack[depth] = fname
        record = {
            "stack" : [ self.stack[f] for f in range(0, depth) ],
            "name" : fname,
            "time" : float(g[1]),
        }
        if g[2]:
            pct = g[2][3:]
            pct = pct.strip('%')
            record["pct_of_total"] = float(pct)
        if g[3]:
            record["call_count"] = int(g[4])
            record["min_time"] = float(g[5])
            record["max_time"] = float(g[6])
            record["std_dev"] = float(g[7])
            record["histogram"] = [ int(g[i]) for i in range(8, len(g)) ]
        s = json.dumps(record)
        print(sep, end='')
        print(s)

def translate_empire(args):
    p = Parser()
    f = open(args.file)
    print('[')
    sep = ''
    for l in f:
        if l[0] == '*':
            continue
        entry = p.parse(l, sep)
        sep = ','
    print(']')

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Translate an Empire log file to another format.")
    parser.add_argument('--file', required=True,
                        help = "Input file name")
    parser.add_argument('--format', required=False, default='json',
                        help = "Specify the output file format")
    args = parser.parse_args()

    p = Parser()
    translate_empire(args)
tom95858 commented 4 years ago

Here is some sample output ...

[
{"stack": [], "name": "Teuchos::StackedTimer:", "time": 1409.61, "call_count": 1, "min_time": 1409.78, "max_time": 0.0669965, "std_dev": 8.0, "histogram": [18, 26, 32, 17, 7, 3, 2, 8, 7]}
,{"stack": ["Teuchos::StackedTimer:"], "name": "PreInitialize:", "time": 0.0320637, "pct_of_total": 0.00227466, "call_count": 1, "min_time": 0.0322081, "max_time": 6.1766e-05, "std_dev": 8.0, "histogram": [10, 15, 12, 24, 22, 17, 9, 7, 4]}
,{"stack": ["Teuchos::StackedTimer:"], "name": "Initialize:", "time": 136.201, "pct_of_total": 9.66231, "call_count": 1, "min_time": 136.527, "max_time": 0.19158, "std_dev": 3.0, "histogram": [3, 8, 8, 22, 22, 12, 18, 18, 14]}
oceandlr commented 4 years ago

Hi @tom95858

Thanks for the example. I have packed the data into json and published it. (I used perl's JSON) Is the intended extraction method only ldmsd_stream_subscribe (as in the hello_stream_sampler)? Should store implement that? This doesn't look like a metric set nor is it something that is handed off in the store call, correct?

oceandlr commented 4 years ago

Hi @tom95858

A few questions on streams: 1) Trying to use ldmsd_streams with multiple levels of aggregation. I think this is what I am seeing -- is this correct: a) using ldms_stream_publish I seem to only be able to publish to an ldmsd along as both are nids, or on localhost. I cannot publish from the login to one of the nodes in a job launch. Is this correct? b) should I be able to use an aggregator with hello_sampler (or similar)? That is, does the stream propagate up, or am I supposed to extract it and turn it into a metric set at the point of entry? 2) If it does propagate up, aee above re store -- is the ldmsd which will be doing the store supposed to get the data via ldms_stream_subscribe (not store)? 3) can you provide some info re Kokkos under store? the kokkos_sampler looks like it makes a metric set, not publish to an ldmsd_stream. The Kokkos_store looks like it subscribes to a stream, not store a metric set. How are these supposed to work together?

tom95858 commented 4 years ago

Hi @tom95858

A few questions on streams:

1. Trying to use ldmsd_streams with multiple levels of aggregation. I think this is what I am seeing -- is this correct:
   a) using ldms_stream_publish I seem to only be able to publish to an ldmsd along as both are nids, or on localhost. I cannot publish from the login to one of the nodes in a job launch. Is this correct?

You can publish to any node with which you have network connectivity

   b) should I be able to use an aggregator with hello_sampler (or similar)? That is, does the stream propagate up, or am I supposed to extract it and turn it into a metric set at the point of entry?

Both models can be used. The slurm sampler, for example, turns the published data into a metric set.

A 1st level aggregator can subscribe to data published to the ldmsd on the compute, using the prdcr_subscribe config command. Similarly the 2nd can prdcr_subscribe to the 1st, the 3rd to the second and so on.

2. If it does propagate up, aee above re store -- is the ldmsd which will be doing the store supposed to get the data via ldms_stream_subscribe (not store)?

If it's turned into a metric set, you can use the store, if it's not, then you store would subscribe to the stream and store the data received on the stream (this is what the kokkos_store does).

3. can you provide some info re Kokkos under store? the kokkos_sampler looks like it makes a metric set, not publish to an ldmsd_stream. The Kokkos_store looks like it subscribes to a stream, not store a metric set. How are these supposed to work together?

I'm not sure what you're looking at, but it is probably dead code.

oceandlr commented 4 years ago

Thanks @tom95858 1a) probably there is something between the login and the allocation preventing this. I won't worry about it 1b) ok -- I missed the prdcr_subscribe -- I will check it out 2) ok 3) ok

oceandlr commented 4 years ago

Got it! Thanks!