Closed wu-sheng closed 1 year ago
I have done one issue about the agent and I'm interested in oap. I think I can do this task, could you please assign it to me?
Assigned. Good luck.
Assigned. Good luck.
Ok, I will do my best.
@mufiye You could take one step at a time. Make metrics available for airflow first. Then move forward on logs.
@mufiye You could take one step at a time. Make metrics available for airflow first. Then move forward on logs.
Ok, I will do the metrics part first. And I think I can refer to other similar issues about how to add the metrics dashboard, such as issue #9677.
Hello, @wu-sheng . I find that all data opentelemetry collector received use tag to compose "metrics name" but have no "tag attributes", such as "airflow.ti.finish.tutorial.templated.up_for_reschedule", "airflow.ti.finish.tutorial.print_date.shutdown". I have no idea how to write the mal rules to process these data.
What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.
What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.
I mean that all info is contained in the "metrics name" such as
Does the original statsd have these metadata?
These metadata compose the name, such as "local_task_job.task_exit.
@potiuk Do you have time to help? We want to monitor airflow with meteics having job/dag/task IDs to group metrics rather than just metrics for the whole airflow server.
These metadata compose the name, such as "local_task_job.task_exit.
. . . ".
OK, if it has, we could write a small MAL script(groovy based) to split these matadata. But meanwhile, I think how to match these metrics are a little challenging. @kezhenxu94 @hanahmily @wankai123 What do you suggest? Do we have to write a regex based analysis?
@mufiye Do you check OTEL side configurations? Is there a way to change their style? Otherwise, we maybe need to build a stated receiver.
@mufiye Do you check OTEL side configurations? Is there a way to change their style? Otherwise, we maybe need to build a stated receiver.
I think maybe the processor of the otel collector can do this and I need to check this part.
What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.
I mean that all info is contained in the "metrics name" such as
, , , , , and so on. But I have no way to filter and process. Or I just don't consider these metrics? Because of the statsD data format, the otel data collected will not have "key value pair" tag attributes.
I doubt that you mixed the concept between "airflow job" and "opentelemetry job"? We use the OpenTelemetry Job name to distinguish data sources.
As for the metrics name like "local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>"
, I think you should spit them in the OpenTelemetry processor and move the metadata into tags then send to OAP, anyway I'll take a look at Airflow's doc to see what's the case.
What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.
I mean that all info is contained in the "metrics name" such as
, , , , , and so on. But I have no way to filter and process. Or I just don't consider these metrics? Because of the statsD data format, the otel data collected will not have "key value pair" tag attributes. I doubt that you mixed the concept between "airflow job" and "opentelemetry job"? We use the OpenTelemetry Job name to distinguish data sources.
As for the metrics name like
"local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>"
, I think you should spit them in the OpenTelemetry processor and move the metadata into tags then send to OAP, anyway I'll take a look at Airflow's doc to see what's the case.
I think I say something wrong. It could add "key value pair" tag to the statsD message, but actually airflow only use the name to contain these metadata. I think using OpenTelemetry processor to process the data maybe a feasible method. About the "job", it is just the original airflow metrics name.
Not much time to help (catching up with some stuff) , but from what it is worth - statsd of Airflow is not the "best" to consume for Skywalking - unforatunately you'd indeed need to parse the metric name and while I am not sure how OTEL processor might work, regexp approach might be a good idea.
However just to give you perspective - Airflow's metrics are evolving.
Quite recently (coming in next version of Airflow) - 2.6 most likely @hussein-awala improved Statsd metrics with DataDog metadata tags - https://github.com/apache/airflow/pull/28961 and maybe, rather than focusing on pure statsd metrics you could integrate those.
Also - a bit more long term - In Airlfow we already approved Open Telemetry support for Airflow https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow and we even have a chance to progress with the implementation - @feruzzi is looking into the integration and even is adding better support for Airflow's statsd metrics testing in Breeze (Airflow development environment) with Grafana and Prometheus - https://github.com/apache/airflow/pull/29449
So maybe it could be a nice teamwork.
I have done the research. And start a new discussion in the opentelemetry collector contrib. @wu-sheng @kezhenxu94
I have tried to use the metrics transform processor, transform processor and attributes processor.
The metrics transform processor's combine function can process the counter and gauge data, but can not process summary and histogram data. It will encounter some problems when processing histogram data it will report errors in the terminal, and for summary data, the datapoint part of the result data is null.
For the transform processor, I could use functions like replace_pattern to make the metrics name concise, such as "ti.finish.
Could you use set(target, value)
and replace_match
to achieve this? Only at this time replace_match
is using regex to split the value of the specific key, such as dag_id. Such as only matching the text after the 2nd dot.
Could you use
set(target, value)
andreplace_match
to achieve this? Only at this timereplace_match
is using regex to split the value of the specific key, such as dag_id. Such as only matching the text after the 2nd dot.
I try it but it doesn't work. Because the third argument of replace_match
is the string, and it will replace the key, we can't get the dag_id and use it as the third argument in the replace_match
function. I have also tried the Split
function before, but it will return a string array and the transform processor does not provide the index operation of this array.
Are you considering this too complex? In the transfer process, you should be able to hardcode most of them, right?
it will replace the key
Tag key is static and hard codes, such as task_id
a key.
replace_match would change everything, it may not be a good one. Split
should be good. You should have an expression to get the array and then use the index.
And I can find the index relative docs, https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#lists
Are you considering this too complex? In the transfer process, you should be able to hardcode most of them, right?
it will replace the key
I think I just say something wrong, I want to say that it will change the value of the relative key.
replace_match would change everything, it may not be a good one.
Split
should be good. You should have an expression to get the array and then use the index.And I can find the index relative docs, https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#lists
I think we can not get the single string in the array. The doc says that "the grammar does not provide an accessor to individual list entries".
We could use this(replace ConvertCase to another function) to set the metric name without parameter.
metrics:
set(metric.name, ConvertCase(metric.name, "snake"))
Meanwhile, we could set(attributes["job_id"], replace(metric.name, xxx))
. Of course, the set(attributes...)
must run first, otherwise, the metadata lost.
Could you check what I am missing?
We could use this(replace ConvertCase to another function) to set the metric name without parameter.
metrics: set(metric.name, ConvertCase(metric.name, "snake"))
Meanwhile, we could
set(attributes["job_id"], replace(metric.name, xxx))
. Of course, theset(attributes...)
must run first, otherwise, the metadata lost.Could you check what I am missing?
Yes, you are right. And I have tried this before by below config. The most important thing I think is how to process the attributes in set(attributes["job_id"], replace(metric.name, xxx))
.
processors:
transform:
metric_statements:
- context: resource
statements:
- context: datapoint
statements:
- set(attributes["job_id"], metric.name)
- context: metric
statements:
- replace_match(name, "system.*.cpu", "system.cpu")
From OTEL perspective, I think you only have to do is splitting local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>
as metric name with all other things as a parameter tag.
Because on the OAP side, the MAL engine is powered by Groovy, which is much more powerful and flexible than OTEL processor. You could finish the left processes there.
Is this possible? I am at Slack for DM, if you have any trouble, ping me there.
@mufiye Any update about this?
@mufiye Any update about this?
Sorry, little update these days. I successfully send data to skywalking oap from airflow. I want to ask that for the otel-collector, which the exporter option should we choose, otlp or opencensus?
otlp is a better choice. oc is merged into otlp.
Have you resolved the issue we discussed on slack?
Have you resolved the issue we discussed on slack?
I review the otel function, I find using the replace_pattern
function can clear the redundant string in the tag value. Also, I think the groovy can do it too, but I have not read this part completely.
Good to see there is no block. Take your time. The planned release for 9.4.0 should be 2-3 weeks away. If your plan is adding this to 9.4.0, let's know. I will check with you before cutting release.
Good to see there is no block. Take your time. The planned release for 9.4.0 should be 2-3 weeks away. If your plan is adding this to 9.4.0, let's know. I will check with you before cutting the release.
I'm not sure whether I can finish this in 2-3 weeks. The next step for me is to comb the airflow metrics and write the rule. And I still encounter a problem that data collected by the otel-collector can not seen in the skywalking storage. This problem happens when I use otlp exporter option instead of opencensus option. I think I have lots of things to learn. By the way, it's time for me to go back school, so I may allocate relatively less time to this. But no matter what, I will do my best.
Good to see there is no block. Take your time. The planned release for 9.4.0 should be 2-3 weeks away. If your plan is adding this to 9.4.0, let's know. I will check with you before cutting release.
Is there any way to show the received original metrics in the log?
OpenTelemetryMetricRequestProcessor#processMetricsRequest
. There are several debug logs in here.
OpenTelemetryMetricRequestProcessor#processMetricsRequest
. There are several debug logs in here.
By reading the skywalking log and code, I have found out why skywalking storage doesn't show some metrics. The "AggregationTemporality" is delta for counter metrics so the skywalking will discord them by OpenTelemetryMetricRequestProcessor#adaptMetrics
. By viewing the otel collector log, I think "AggregationTemporality" of all counter metrics from airflow is delta. How should I handle this situation? Could you give me some suggestions?
By the way, the timer metrics from the airflow can only be transformed to "ExponentialHistogram" or "Summary" type. I think skywalking doesn't support ExponentialHistogram for otel receiver. So "Summary" type is the only option.
Do you mean delta is increasement from last report period? Could you explain a little more?
"ExponentialHistogram" or "Summary" type. I think skywalking doesn't support ExponentialHistogram for otel receiver. So "Summary" type is the only option.
Is this different from histogram?
Is this different from histogram?
The summary shows count, sum and quantile of data. But histogram shows count, sum and bucket of data. I think we can use summary data.
Do you mean delta is increasement from last report period? Could you explain a little more?
Yes, delta means increasement from last report period. About skywalking how to process counter(sum) data:
if (metric.hasSum()) {
final Sum sum = metric.getSum();
if (sum
.getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) {
return Stream.empty();
}
Counter Data from airflow, the AggregationTemporality is Delta:
Descriptor:
-> Name: airflow_job_start
-> Description:
-> Unit:
-> DataType: Sum
-> IsMonotonic: false
-> AggregationTemporality: Delta
Do you mean delta is increasement from last report period? Could you explain a little more?
I have researched the stated receiver, processors, and otlp exporter of otel receiver to transform Delta metrics to CUMULATIVE metrics, but not found a component can do this.
Is this different from histogram?
The summary shows count, sum and quantile of data. But histogram shows count, sum and bucket of data. I think we can use summary data.
AFAIK, MAL supports histogram, which could access counter, and bucket to get avg or percentile. But summary is not not supported, and it has less precision. If histogram works, we should never choose summary.
Do you mean delta is increasement from last report period? Could you explain a little more?
Yes, delta means increasement from last report period.
About skywalking how to process counter(sum) data:
if (metric.hasSum()) { final Sum sum = metric.getSum(); if (sum .getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) { return Stream.empty(); }
Counter Data from airflow, the AggregationTemporality is Delta:
Descriptor: -> Name: airflow_job_start -> Description: -> Unit: -> DataType: Sum -> IsMonotonic: false -> AggregationTemporality: Delta
What is the issue? If the source(from otel) is delta, then use SUM to build data per min/hour/day is correct. The recent AWS s3 monitoring is using this way.
Do you mean delta is increasement from last report period? Could you explain a little more?
I have researched the stated receiver, processors, and otlp exporter of otel receiver to transform Delta metrics to CUMULATIVE metrics, but not found a component can do this.
Check my last comment. Delta is good, which mean increasement in the period. We sum them could get accurate count per dimension.
What is the issue? If the source(from otel) is delta, then use SUM to build data per min/hour/day is correct. The recent AWS s3 monitoring is using this way.
My issue is that the skywalking oap can not receive counter data of airflow. I think the reason is the delta counter data will not be received. The relative code is shown below in OpenTelemetryMetricRequestProcessor#adaptMetrics
. Did I misunderstand this part of the code?
if (metric.hasSum()) {
final Sum sum = metric.getSum();
if (sum
.getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) {
return Stream.empty();
}
I think about Delta, it should be converted as a gauge, no matter if it is monotonic
or not. @kezhenxu94 What do you think?
@mufiye Would you like to try this first as a separate PR?
I mean you need a pull request to update OpenTelemetryMetricRequestProcessor
by following above logic. That change should be done first before the airflow features.
I think about Delta, it should be converted as a gauge, no matter if it is
monotonic
or not. @kezhenxu94 What do you think? @mufiye Would you like to try this first as a separate PR?
What the separate PR mean? What should I do?
I mean you need a pull request to update
OpenTelemetryMetricRequestProcessor
by following above logic. That change should be done first before the airflow features.
I would like to try it.
I think it should be not hard. Try to follow AGGREGATION_TEMPORALITY_CUMULATIVE
's not Monotonic
case.
If this metric is going to report at least once per minute, we should be good.
Search before asking
Description
This is an open issue for new contributors. Apache Airflow is a widely used workflow scheduler. We are encouraging someone new to the community to add a new level catalog(workflow) for Airflow.
Metrics
Airflow exposes metrics through StatsD, https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html. We could use StatsD + OpenTelemetry StatesD(https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/README.md) + OpenTelemetry OTEL exporter to ship the metrics to SkyWalking OTEL receiver. Then use MAL to build metrics as well as a dashboard for those metrics. Notice, a new layer and new UI menu should be added.
Logging
Airflow supports Fluents to ship metrics, https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/logging-architecture.html. SkyWalking already has FluentD setup support, so we should be able to receive and catalog the logs.
Additionally, Task Logs seems an interesting think. We could use LAL(Log Analysis) to group the logs by task name(or ID) by treating tasks as endpoints(SkyWalking concept).
Use case
Add more observability for Airflow server.
Related issues
No response
Are you willing to submit a PR?
Code of Conduct