m3dev / gokart

Gokart solves reproducibility, task dependencies, constraints of good code, and ease of use for Machine Learning Pipeline.
https://gokart.readthedocs.io/en/latest/
MIT License
318 stars 57 forks source link

PoC: visualize processing time with opentelemetry / gcp cloud trace #280

Closed yokomotod closed 2 years ago

yokomotod commented 2 years ago

what's this

This PR makes gokart to send processing time to GCP CloudTrace and visualize processing time.

discussion

It uses opentelemetry ecosystems so it's not only for GCP users but possible to support other platforms.
https://opentelemetry.io/registry/?component=exporter

However, I'm not sure if this should be implemented inside the gokart or just written in user side code.

As you can see, it's enough simple to write user side code, with 100% configurability.
(In this case, I can submit PR to adding example recipe document)

What do you think?
In my opinion, +1 to starting outside the gokart maybe...?

todo

if we choose to implement inside the gokart,

sample

with inside gokart support, user side doesn't need any code change
but requires proper GCP permission: roles/cloudtrace.agent

import gokart

class Foo(gokart.TaskOnKart):
    def run(self):
        self.dump('foo')

class Example(gokart.TaskOnKart):

    def requires(self):
        return Foo()

    def run(self):
        self.dump('Hello, world!')

task = Example()
gokart.build(task)
image
mski-iksm commented 2 years ago

Thank you for adding great feature! I think adding this feature to gokart looks great, since tracking processing time is important.

I agree with you and think switching on/off tracing must be added in this PR.

yokomotod commented 2 years ago

It seems enough simple to integrate within user-side.

I share sample code and close this issue. Thanks for comments!

import luigi
import gokart

from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.resources import get_aggregated_resources
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.resourcedetector.gcp_resource_detector import GoogleCloudResourceDetector

@luigi.Task.event_handler(luigi.Event.START)
def _start(task):
    task.trace_span = tracer.start_span(str(task))

@luigi.Task.event_handler(luigi.Event.SUCCESS)
def _success(task):
    task.trace_span.end()

@luigi.Task.event_handler(luigi.Event.FAILURE)
def _failure(task):
    task.trace_span.end()

if __name__ == '__main__':
    resources = get_aggregated_resources(
        [GoogleCloudResourceDetector(raise_on_error=True)]
    )
    tracer_provider = TracerProvider(resource=resources)
    cloud_trace_exporter = CloudTraceSpanExporter()
    tracer_provider.add_span_processor(
        BatchSpanProcessor(cloud_trace_exporter)
    )
    trace.set_tracer_provider(tracer_provider)
    tracer = trace.get_tracer(__name__)

    # here you can configure span name and attributes as your like
    with tracer.start_as_current_span("gokart.run"):
        gokart.run()

If you run the task on GKE, configure Downward API in your manifest as https://google-cloud-opentelemetry.readthedocs.io/en/latest/examples/cloud_resource_detector/README.html#gke-container-resources-are-not-being-detected-or-exported