hynek / prometheus-async

Async Python helpers for the official prometheus-client.
https://prometheus-async.readthedocs.io/
Apache License 2.0
159 stars 17 forks source link

Support for async custom collector #13

Open hanikesn opened 6 years ago

hanikesn commented 6 years ago

The upstream client supports adding custom collectors. Adding support for async collectors would make it possible easily to gather metrics over http for example.

hynek commented 6 years ago

What API do you have in mind? An async collect()? I don’t think that should be a slow operation?

hanikesn commented 6 years ago

The problem is that I need to query external services via HTTP to aggregate the metrics. We could also run the collection in a separate task, but I'd prefer doing it the async way.

hynek commented 6 years ago

My understanding is that the whole point of custom collectors is to collect data somewhere else (like you said in a separate thread) and then expose them using a custom collector.

prometheus_client calls it synchronously, I have no idea how you’d want to get that async? You’d have to run loop_run_until_complete on each collection? Bonus points for collect being a generator.

I’m not even arguing with you, I just don’t see a good way to achieve what you’d like to do?

hanikesn commented 6 years ago

So here's basically what I want to do:

class Collector(object):
    async def collect(self):
        async with session.get(url) as response:
            async for s in response.content:
                yield CounterMetricFamily('my_counter_total', 'Help text', labels=['foo'], value=s)

I'm not sure why a loop_run_until_complete would be needed separately here. The server already runs in a event loop. And I think it should be possible to collect with an async for loop.

Edit: Also according the official documentation metrics should be acquired just in time when possible: https://prometheus.io/docs/instrumenting/writing_exporters/#scheduling

ofen commented 3 years ago

So here's basically what I want to do:

class Collector(object):
    async def collect(self):
        async with session.get(url) as response:
            async for s in response.content:
                yield CounterMetricFamily('my_counter_total', 'Help text', labels=['foo'], value=s)

I'm not sure why a loop_run_until_complete would be needed separately here. The server already runs in a event loop. And I think it should be possible to collect with an async for loop.

Edit: Also according the official documentation metrics should be acquired just in time when possible: https://prometheus.io/docs/instrumenting/writing_exporters/#scheduling

One of possible solution is to override generate_latest function and several CollectorRegistry class methods (change sync loops to async one):

class CustomRegistry(CollectorRegistry):
    async def collect(self):
        """Yields metrics from the collectors in the registry."""
        collectors = None
        ti = None
        with self._lock:
            collectors = copy.copy(self._collector_to_names)
            if self._target_info:
                ti = self._target_info_metric()
        if ti:
            yield ti
        for collector in collectors:
            async for metric in collector.collect():
                yield metric

    async def register(self, collector):
        """Add a collector to the registry."""
        with self._lock:
            names = await self._get_names(collector)
            duplicates = set(self._names_to_collectors).intersection(names)
            if duplicates:
                raise ValueError(
                    'Duplicated timeseries in CollectorRegistry: {0}'.format(
                        duplicates))
            for name in names:
                self._names_to_collectors[name] = collector
            self._collector_to_names[collector] = names

    async def _get_names(self, collector):
        """Get names of timeseries the collector produces."""
        desc_func = None
        # If there's a describe function, use it.
        try:
            desc_func = collector.describe
        except AttributeError:
            pass
        # Otherwise, if auto describe is enabled use the collect function.
        if not desc_func and self._auto_describe:
            desc_func = collector.collect

        if not desc_func:
            return []

        result = []
        type_suffixes = {
            'counter': ['_total', '_created'],
            'summary': ['', '_sum', '_count', '_created'],
            'histogram': ['_bucket', '_sum', '_count', '_created'],
            'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
            'info': ['_info'],
        }
        async for metric in desc_func():
            for suffix in type_suffixes.get(metric.type, ['']):
                result.append(metric.name + suffix)
        return result

def sample_line(line):
    if line.labels:
        labelstr = '{{{0}}}'.format(','.join(
            ['{0}="{1}"'.format(
                k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
                for k, v in sorted(line.labels.items())]))
    else:
        labelstr = ''
    timestamp = ''
    if line.timestamp is not None:
        # Convert to milliseconds.
        timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
    return '{0}{1} {2}{3}\n'.format(
        line.name, labelstr, floatToGoString(line.value), timestamp)

async def generate_latest(registry):
    """Returns the metrics from the registry in latest text format as a string."""
    output = []
    async for metric in registry.collect():
        try:
            mname = metric.name
            mtype = metric.type
            # Munging from OpenMetrics into Prometheus format.
            if mtype == 'counter':
                mname = mname + '_total'
            elif mtype == 'info':
                mname = mname + '_info'
                mtype = 'gauge'
            elif mtype == 'stateset':
                mtype = 'gauge'
            elif mtype == 'gaugehistogram':
                # A gauge histogram is really a gauge,
                # but this captures the structure better.
                mtype = 'histogram'
            elif mtype == 'unknown':
                mtype = 'untyped'

            output.append('# HELP {0} {1}\n'.format(
                mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0} {1}\n'.format(mname, mtype))

            om_samples = {}
            for s in metric.samples:
                for suffix in ['_created', '_gsum', '_gcount']:
                    if s.name == metric.name + suffix:
                        # OpenMetrics specific sample, put in a gauge at the end.
                        om_samples.setdefault(suffix, []).append(sample_line(s))
                        break
                else:
                    output.append(sample_line(s))
        except Exception as exception:
            exception.args = (exception.args or ('',)) + (metric,)
            raise

        for suffix, lines in sorted(om_samples.items()):
            output.append('# HELP {0}{1} {2}\n'.format(metric.name, suffix, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0}{1} gauge\n'.format(metric.name, suffix))
            output.extend(lines)
    return ''.join(output).encode('utf-8')

the custom collector class:

class CustomCollector(object):
    def __init__(self, app):
        self.session = app['session']
        self.metrics_url = app['metrics_url']

    async def collect(self):
        resp = await self.session.get(self.metrics_url)
        payload = await resp.json()
        for name, v in payload['metrics'].items():
            name = name.replace('.', '_')
            name = name.replace('-', '_')
            name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
            kind = v['kind']
            if kind == 'Gauge':
                m = GaugeMetricFamily(name, name, labels=[])
            if kind == 'Timer':
                m = SummaryMetricFamily(name, name, labels=[])
            if kind == 'Counter':
                m = CounterMetricFamily(name, name, labels=[])
            for i in v['values']:
                tags = i['tags']
                labels = {tag['key']: tag['value'] for tag in tags}
                value = i['values'][0]['v']
                timestamp = i['values'][0]['t']
                m.add_sample(name, labels, value, timestamp=timestamp)
            yield m

collector = CustomCollector(app)
registry = CustomRegistry(auto_describe=True)
await registry.register(collector)
await generate_latest(registry)
Yamakaky commented 1 year ago

Hello, is there a plan to have something similar to what @ofen suggest builtin? I have the following setup:

I'm not sure where to go from here which doesn't imply having a copy of @ofen's collector and registry + reimplementing server_stats.

Maybe have both collect and async_collect methods in the async collector, that way the async registry stays compatible with sync stuff.

hynek commented 11 months ago

What API would y'all like?

Yamakaky commented 6 months ago

What are you missing from what @hanikesn and @ofen are suggesting?

hynek commented 6 months ago

Well, I asked for an API ya'll would like in August and got crickets so I've assumed the interest has waned?

Yamakaky commented 5 months ago

I think the main API is to have add an async generator collect_async() to the collector API, and maybe run both version in sequence. If that's too complicated to implement, maybe a separate AsyncCollector type. Do you want a PR with a first draft of implementation?

hynek commented 5 months ago

Unless I'm missing something, we don't run any collect whatsoever. We just run prometheus_client's generate_latest.

So, unless we wanted to reimplement the official client, I guess the only way would be to somehow run generate_latest in a thread pool?