apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.86k stars 4.26k forks source link

[Bug]: Worker logs on Prism Stand-Alone Binary Not Appearing #30703

Open frankljin opened 7 months ago

frankljin commented 7 months ago

What happened?

To replicate this issue:

  1. Clone the beam-starter-go pipeline here
  2. Run it using the command. Worker logs should appear as expected ("Hello", "World!", "Greetings", one per line)
    go run main.go --input-text="Greetings"
  3. Now, trying to run it using the stand-alone binary here using this command:
go run main.go --input-text="Greetings" --runner=universal --endpoint=localhost:8073 --environment_type=LOOPBACK 
  1. Worker logs do not show up. Pipelines setup logs do show up (like pipelines graph creation, etc)

From my understanding, both of these are run on the Prism runner. Comparing the excerpts:

(stand-alone binary)

2024/03/21 18:13:23 Submitted job: job-013
2024/03/21 18:13:23  (): starting job-013[go-job-1-1711044803815930815]
2024/03/21 18:13:23  (): running job-013[go-job-1-1711044803815930815]
2024/03/21 18:13:23 Job[job-013] state: RUNNING
2024/03/21 18:13:23 starting worker job-013[go-job-1-1711044803815930815]_go
2024/03/21 18:13:23  (): pipeline completed job-013[go-job-1-1711044803815930815]
2024/03/21 18:13:23 stopping worker job-013[go-job-1-1711044803815930815]_go
2024/03/21 18:13:23  (): terminating job-013[go-job-1-1711044803815930815]

(command in the quickstart)

2024/03/21 18:14:03 Submitted job: job-001
2024/03/21 18:14:03  (): starting job-001[go-job-1-1711044843837533764]
2024/03/21 18:14:03  (): running job-001[go-job-1-1711044843837533764]
2024/03/21 18:14:03 Job[job-001] state: RUNNING
2024/03/21 18:14:03 starting worker job-001[go-job-1-1711044843837533764]_go
2024/03/21 18:14:03 INFO Hello source=/redacted/beam-starter-go/main.go:36 time=2024-03-21T18:14:03.849Z worker.ID=job-001[go-job-1-1711044843837533764]_go worker.endpoint=localhost:36291
2024/03/21 18:14:03 INFO World! source=/redacted/beam-starter-go/main.go:36 time=2024-03-21T18:14:03.849Z worker.ID=job-001[go-job-1-1711044843837533764]_go worker.endpoint=localhost:36291
2024/03/21 18:14:03 INFO Greetings source=/redacted/beam-starter-go/main.go:36 time=2024-03-21T18:14:03.849Z worker.ID=job-001[go-job-1-1711044843837533764]_go worker.endpoint=localhost:36291
2024/03/21 18:14:03 stopping worker job-001[go-job-1-1711044843837533764]_go
2024/03/21 18:14:03 control response channel closed

Issue Priority

Priority: 3 (minor)

Issue Components

lostluck commented 4 months ago

Agreed that the logs should be accessible. I'm uncertain that they should be in the prism process terminal output though. They should ideally be routed to the launching process/via API access, or visible via the prism UI.

lostluck commented 3 months ago

I had made a mistake earlier when I first looked at this. Here's how it's all going down:

  1. Prism does produce the job log messages API from Job management, allowing Job Messages to be sent back to the launching process if handled synchronously.
  2. Prism hosts and receives Worker Logs, through the Beam FnAPI Logging service which is what is being printed out there. But this currently just does a prism side log of the message.

https://github.com/apache/beam/blob/f3e6c66c0a5d3a8638fd94978adf503be5081274/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go#L212

  1. When launching a single Go SDK pipeline binary, (as in your "stand alone binary" ) the prism process is in the same process entirely, making those logs visible in the launching command line.

  2. When running the pipeline against a stand alone prism binary, instead the logs would appear wherever the prism runner has been launched.

  3. There's no current Beam API for accessing these worker logs which are ultimately as determined by the runner (eg. Sent to Cloud Logging if running on Dataflow).

Running against a stand alone prism runner is technically the "common case" we're moving towards, since that affects support for the Java and Python SDKs (and future SDKs).

I know that ultimately I'd like Prism to actually write such logs to files, in some default directory and similar, and be able to surface them in the UI to some degree. Eg. Select a ParDo and get the related log information displayed in the browser. This doesn't necessarily help the command line case, short of also printing out information to the prism terminal command line like "job data and logs are found at ". That requires moving forward on not keeping everything in memory though.

I can see two options, which could both be implemented since they cover different things.

Flags

I do need to see what's possible for the current Java and Python SDKs, but for the current Go SDK at least, we could add a job configuration option to route worker logs to the Job Log messages. This could be set as a flag stand alone prism flag for the process's default, and overridden on a per job level via Pipeline Options.

That would then allow SDKs that are downloading and starting prism by default to set that configuration since it would support most test uses, and selectively by the user on either the pipeline launch, or the prism launch.

This helps auto-solve debugging running in Docker containers too.

Disable Logging in Loopback, have SDKs default to StdOut in that case.

The other option is to simply not send a logging address to the SDK when prism is executing a pipeline in LOOPBACK mode. I believe the SDKs do fallback to StdOut and StdErr output for logs if there's no logging connection, but that's only for failure. This would avoid round trips to the prism process, and rely on the SDK fallback options.

https://github.com/apache/beam/blob/f3e6c66c0a5d3a8638fd94978adf503be5081274/sdks/go/pkg/beam/core/runtime/harness/logging.go#L133

Basically we'd just check if there's actually a logging endpoint here, and if it's empty, we quietly write logs to StdOut instead. Likely we would need to do something similar for Java and Python though.

Edit: On reflection, I don't love the endpoint idea just above here, since it would require more changes to the Python and Java SDKs. It also doesn't solve anything for Xlang pipelines, as it encourages loss of the logs.

lostluck commented 3 months ago

The fastest ham-fisted Go SDK solution could also be in the universal runner code: Where if the pipeline is executing in Loopback mode, disable the remote logging hook:

https://github.com/apache/beam/blob/f3e6c66c0a5d3a8638fd94978adf503be5081274/sdks/go/pkg/beam/core/runtime/harness/logging.go#L97

hooks.DisableHook(harness.DefaultRemoteLoggingHook)

Since that Std.Logger does exactly what we want already and reduces the blast radius to just Go Loopback execution (which would then apply to all universal pipeline runner invocations, including the in process prism). https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/log/log.go#L57

Edit: This would lose the local Go logs from Prism, but not the Xlang logs from other processes/containers. In combination with the flags suggested above, everything could be routed locally. But I think the best option is that flags routing to JobLogs solution I mentioned, since that feels closer to the desired behavior of "local debugging of the whole pipeline", while also being compatible with debugging cross language transforms.