Open SamStentz opened 1 month ago
A related note for Prism Runner for testing (don't see a tag for prism runner would have added otherwise)
package poc
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
func TestDoNothingTransform(t *testing.T) {
beam.Init()
pipeline, scope, col := ptest.CreateList([]string{"foo"})
col = beam.AddFixedKey(scope, col)
col = DoNothingTransform(scope, col)
// If I comment out this passert all retries don't occur.
passert.EqualsList(scope, col, []string{"foo"})
ptest.RunAndValidate(t, pipeline)
}
func TestMain(m *testing.M) {
ptest.MainWithDefault(m, "prism")
}
I am seeing similar behavior where logs show all retries don't execute if I don't include the passert clause.
Running tool: /usr/lib/google-golang/bin/go test -timeout 30s -run ^TestDoNothingTransform$ poc
=== RUN TestDoNothingTransform
2024/09/27 22:21:21 INFO Serving JobManagement endpoint=localhost:41941
2024/09/27 22:21:21 starting Loopback server at 127.0.0.1:41841
2024/09/27 22:21:21 components:{transforms:{key:"e1" value:{unique_name:"Impulse" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n1"}}} transforms:{key:"e10" value:{unique_name:"passert.EqualsList/passert.failIfBadEntries" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xe2\x02\n\x19beam:go:transform:dofn:v1\x1a\xc4\x02ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEtQBCpgBCpUBCktnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0vdGVzdGluZy9wYXNzZXJ0LmZhaWxJZkJhZEVudHJpZXMSRggWIgYIFBICCAgiEAgWIggIGBIECBlADyoCCAEiEAgWIggIGBIECBlADyoCCAEiEAgWIggIGBIECBlADyoCCAEqBAgZQAESDAgBEggKBggUEgIICBIKCAYSBgoECBlADxIKCAYSBgoECBlADxIKCAYSBgoECBlADyIFUGFyRG8=\x1aO\n\x02i1\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1\x1aO\n\x02i2\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1\x1aO\n\x02i3\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1"} inputs:{key
:"i0" value:"n11"} inputs:{key:"i1" value:"n8"} inputs:{key:"i2" value:"n9"} inputs:{key:"i3" value:"n10"} environment_id:"go"}} transforms:{key:"e2" value:{unique_name:"beam.createFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xfa\x01\n\x19beam:go:transform:dofn:v1\x1a\xdc\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEoUBCmQSOwgYEjcIGkozZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtLmNyZWF0ZUZuGiV7InZhbHVlcyI6WyJBMlp2Ync9PSJdLCJ0eXBlIjoiQ0F3PSJ9EgwIARIICgYIFBICCAgaCAoGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n1"} outputs:{key:"i0" value:"n2"} environment_id:"go"}} transforms:{key:"e3" value:{unique_name:"beam.addFixedKeyFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xee\x01\n\x19beam:go:transform:dofn:v1\x1a\xd0\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEn0KUApOCjhnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0uYWRkRml4ZWRLZXlGbhISCBYiBAgZQA8qAggCKgQIGUAPEgoIARIGCgQIGUAPGhYKFAoECBlACxIECgIIAhIGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n2"} outputs:{key:"i0" value:"n3"} environm
ent_id:"go"}} transforms:{key:"e4" value:{unique_name:"DoNothing/poc.doNothingTransformFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xc2\x03\n\x19beam:go:transform:dofn:v1\x1a\xa4\x03ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEpsCCu8BElkIGBJVCBpKUWdpdGh1Yi5jb20vdmVyaWx5LXNyYy92ZXJpbHkxL2luZ2VzdGlvbi9waXBlbGluZS9zcmMvcGtnL3BvYy5kb05vdGhpbmdUcmFuc2Zvcm1GbhqRAXsiVGltZXJDb3VudCI6eyJLZXkiOiJ0aW1lckNvdW50In0sIlRpbWVyVGltZXJzdGFtcCI6eyJLZXkiOiJ0aW1lclRpbWVyc3RhbXAifSwiVmFsdWUiOnsiS2V5IjoidmFsdWUifSwiT3V0cHV0U3RhdGUiOnsiRmFtaWx5IjoicHJvY2Vzc2luZ1RpbWUifX0SGAgBEhQKBAgZQAsSBgoECBlAExIECgIIDBoGCgQKAggMIgVQYXJEbw==\".\n\ntimerCount\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6\"3\n\x0ftimerTimerstamp\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6\")\n\x05value\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c2J\x18\n\x0eprocessingTime\x12\x06\x08\x02\x12\x02c7"} inputs:{key:"i0" value:"n3"} outputs:{key:"i0" value:"n4"} environment_id:"go"}} transforms:{key:"e5" value:{unique_name:"passert.EqualsList/Impulse"
spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n5"}}} transforms:{key:"e6" value:{unique_name:"passert.EqualsList/beam.createFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xfa\x01\n\x19beam:go:transform:dofn:v1\x1a\xdc\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEoUBCmQSOwgYEjcIGkozZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtLmNyZWF0ZUZuGiV7InZhbHVlcyI6WyJBMlp2Ync9PSJdLCJ0eXBlIjoiQ0F3PSJ9EgwIARIICgYIFBICCAgaCAoGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n5"} outputs:{key:"i0" value:"n6"} environment_id:"go"}} transforms:{key:"e7" value:{unique_name:"passert.EqualsList/Impulse'1" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n7"}}} transforms:{key:"e8" value:{unique_name:"passert.EqualsList/passert.diffFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xaa\x02\n\x19beam:go:transform:dofn:v1\x1a\x8c\x02ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEqkBClwSSQgYEkUIGkpBZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtL3Rlc3RpbmcvcGFzc2VydC5kaWZmRm4aD3sidHlwZSI6IkNB
dz0ifRIMCAESCAoGCBQSAggIEgoIBhIGCgQIGUAPEgoIBhIGCgQIGUAPGggKBgoECBlADxoICgYKBAgZQA8aCAoGCgQIGUAPIgVQYXJEbw==\x1aO\n\x02i1\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1\x1aO\n\x02i2\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1"} inputs:{key:"i0" value:"n7"} inputs:{key:"i1" value:"n4"} inputs:{key:"i2" value:"n6"} outputs:{key:"i0" value:"n8"} outputs:{key:"i1" value:"n9"} outputs:{key:"i2" value:"n10"} environment_id:"go"}} transforms:{key:"e9" value:{unique_name:"passert.EqualsList/Impulse'2" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n11"}}} transforms:{key:"s1" value:{unique_name:"DoNothing" subtransforms:"e4" inputs:{key:"n3" value:"n3"} outputs:{key:"n4" value:"n4"} environment_id:"go"}} transforms:{key:"s2" value:{unique_name:"passert.EqualsList" subtransforms:"e9" subtransforms:"e7" subtransforms:"e5" subtransforms:"e6" subtransforms:"e8" subtransforms:"e10" inputs:{ke
y:"n4" value:"n4"} environment_id:"go"}} pcollections:{key:"n1" value:{unique_name:"n1" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n10" value:{unique_name:"n10" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n11" value:{unique_name:"n11" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n2" value:{unique_name:"n2" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n3" value:{unique_name:"n3" coder_id:"c5" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n4" value:{unique_name:"n4" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n5" value:{unique_name:"n5" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n6" value:{unique_name:"n6" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n7" value:{unique_name:"n7" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}
} pcollections:{key:"n8" value:{unique_name:"n8" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n9" value:{unique_name:"n9" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} windowing_strategies:{key:"w0" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c1" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY on_time_behavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0" value:{spec:{urn:"beam:coder:bytes:v1"}}} coders:{key:"c1" value:{spec:{urn:"beam:coder:global_window:v1"}}} coders:{key:"c2" value:{spec:{urn:"beam:coder:string_utf8:v1"}}} coders:{key:"c3" value:{spec:{urn:"beam:go:coder:custom:v1" payload:"Cgd2YXJpbnR6EgIIAhpdCklnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0vY29yZS9ydW50aW1lL2NvZGVyeC5lbmNWYXJJbnRaEhAIFiIECBlADyoGCBQSAggIImkKSWdpdGh1Yi5jb20vYXBhY2hlL2JlYW0vc2Rrcy92Mi9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvY29kZXJ4LmRlY1Zhckl
udFoSHAgWIgQIGUADIgYIFBICCAgqBAgZQA8qBAgZQAE="}}} coders:{key:"c4" value:{spec:{urn:"beam:coder:length_prefix:v1"} component_coder_ids:"c3"}} coders:{key:"c5" value:{spec:{urn:"beam:coder:kv:v1"} component_coder_ids:"c4" component_coder_ids:"c2"}} coders:{key:"c6" value:{spec:{urn:"beam:coder:varint:v1"}}} coders:{key:"c7" value:{spec:{urn:"beam:coder:timer:v1"} component_coder_ids:"c4" component_coder_ids:"c1"}} environments:{key:"go" value:{urn:"beam:env:external:v1" payload:"\n\x11\n\x0flocalhost:41841" capabilities:"beam:protocol:progress_reporting:v1" capabilities:"beam:protocol:multi_core_bundle_processing:v1" capabilities:"beam:transform:sdf_truncate_sized_restrictions:v1" capabilities:"beam:protocol:worker_status:v1" capabilities:"beam:protocol:monitoring_info_short_ids:v1" capabilities:"beam:version:sdk_base:go:apache/beam_go_sdk:2.59.0" capabilities:"beam:transform:to_string:v1" capabilities:"beam:protocol:data_sampling:v1" capabilities:"beam:protocol:sdk_consuming_received_data:v1" capabilities:"be
am:coder:bytes:v1" capabilities:"beam:coder:bool:v1" capabilities:"beam:coder:varint:v1" capabilities:"beam:coder:double:v1" capabilities:"beam:coder:string_utf8:v1" capabilities:"beam:coder:length_prefix:v1" capabilities:"beam:coder:kv:v1" capabilities:"beam:coder:iterable:v1" capabilities:"beam:coder:state_backed_iterable:v1" capabilities:"beam:coder:windowed_value:v1" capabilities:"beam:coder:global_window:v1" capabilities:"beam:coder:interval_window:v1" capabilities:"beam:coder:row:v1" capabilities:"beam:coder:nullable:v1" capabilities:"beam:coder:timer:v1" dependencies:{type_urn:"beam:artifact:type:file:v1" role_urn:"beam:artifact:role:go_worker_binary:v1"}}}} root_transform_ids:"e1" root_transform_ids:"e2" root_transform_ids:"e3" root_transform_ids:"s1" root_transform_ids:"s2" requirements:"beam:requirement:pardo:stateful:v1"
2024/09/27 22:21:21 Prepared job with id: job-001 and staging token: job-001
2024/09/27 22:21:21 Staged binary artifact with token: job-001
2024/09/27 22:21:21 Submitted job: job-001
2024/09/27 22:21:21 (): starting job-001[go-job-1-1727475681963997620]
2024/09/27 22:21:21 (): running job-001[go-job-1-1727475681963997620]
2024/09/27 22:21:21 Job[job-001] state: RUNNING
2024/09/27 22:21:21 starting worker job-001[go-job-1-1727475681963997620]_go
2024/09/27 22:21:21 INFO ProcessElement <0, foo> source=.../poc/donothing.go:45 time=2024-09-27T22:21:21.979Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.983Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 0 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.985Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.985Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 1 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.986Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.986Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 2 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.987Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.987Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 3 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.988Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.988Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 4 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.989Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.989Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 5 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.990Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.990Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 6 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.991Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.991Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 7 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.992Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.992Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 8 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.993Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.993Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 9 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.994Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.994Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 10 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.995Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.995Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 11 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.996Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.996Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO (): pipeline completed job-001[go-job-1-1727475681963997620] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO (): terminating job-001[go-job-1-1727475681963997620] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO Job[job-001] state: DONE source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/job.go:125 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO stopping worker job-001[go-job-1-1727475681963997620]_go source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/extworker/extworker.go:117 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
--- PASS: TestDoNothingTransform (0.04s)
PASS
ok poc 0.204s
Running tool: /usr/lib/google-golang/bin/go test -timeout 30s -run ^TestDoNothingTransform$ .../poc
=== RUN TestDoNothingTransform
2024/09/27 22:22:36 INFO Serving JobManagement endpoint=localhost:40163
2024/09/27 22:22:36 starting Loopback server at 127.0.0.1:37333
2024/09/27 22:22:36 components:{transforms:{key:"e1" value:{unique_name:"Impulse" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n1"}}} transforms:{key:"e2" value:{unique_name:"beam.createFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xfa\x01\n\x19beam:go:transform:dofn:v1\x1a\xdc\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEoUBCmQSOwgYEjcIGkozZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtLmNyZWF0ZUZuGiV7InZhbHVlcyI6WyJBMlp2Ync9PSJdLCJ0eXBlIjoiQ0F3PSJ9EgwIARIICgYIFBICCAgaCAoGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n1"} outputs:{key:"i0" value:"n2"} environment_id:"go"}} transforms:{key:"e3" value:{unique_name:"beam.addFixedKeyFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xee\x01\n\x19beam:go:transform:dofn:v1\x1a\xd0\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEn0KUApOCjhnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0uYWRkRml4ZWRLZXlGbhISCBYiBAgZQA8qAggCKgQIGUAPEgoIARIGCgQIGUAPGhYKFAoECBlACxIECgIIAhIGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n2"} outputs:{key:"
i0" value:"n3"} environment_id:"go"}} transforms:{key:"e4" value:{unique_name:"DoNothing/poc.doNothingTransformFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xc2\x03\n\x19beam:go:transform:dofn:v1\x1a\xa4\x03ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEpsCCu8BElkIGBJVCBpKUWdpdGh1Yi5jb20vdmVyaWx5LXNyYy92ZXJpbHkxL2luZ2VzdGlvbi9waXBlbGluZS9zcmMvcGtnL3BvYy5kb05vdGhpbmdUcmFuc2Zvcm1GbhqRAXsiVGltZXJDb3VudCI6eyJLZXkiOiJ0aW1lckNvdW50In0sIlRpbWVyVGltZXJzdGFtcCI6eyJLZXkiOiJ0aW1lclRpbWVyc3RhbXAifSwiVmFsdWUiOnsiS2V5IjoidmFsdWUifSwiT3V0cHV0U3RhdGUiOnsiRmFtaWx5IjoicHJvY2Vzc2luZ1RpbWUifX0SGAgBEhQKBAgZQAsSBgoECBlAExIECgIIDBoGCgQKAggMIgVQYXJEbw==\"3\n\x0ftimerTimerstamp\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6\")\n\x05value\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c2\".\n\ntimerCount\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6J\x18\n\x0eprocessingTime\x12\x06\x08\x02\x12\x02c7"} inputs:{key:"i0" value:"n3"} outputs:{key:"i0" value:"n4"} environment_id:"go"}} transforms:{key:"s1" value:{unique_name:"DoNo
thing" subtransforms:"e4" inputs:{key:"n3" value:"n3"} outputs:{key:"n4" value:"n4"} environment_id:"go"}} pcollections:{key:"n1" value:{unique_name:"n1" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n2" value:{unique_name:"n2" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n3" value:{unique_name:"n3" coder_id:"c5" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n4" value:{unique_name:"n4" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} windowing_strategies:{key:"w0" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c1" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY on_time_behavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0" value:{spec:{urn:"beam:coder:bytes:v1"}}} coders:{key:"c1" value:{spec:{urn:"beam:coder:global_window:v1"}}} coders:{key:"c2" value:{spec:{urn:"beam:coder:string_u
tf8:v1"}}} coders:{key:"c3" value:{spec:{urn:"beam:go:coder:custom:v1" payload:"Cgd2YXJpbnR6EgIIAhpdCklnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0vY29yZS9ydW50aW1lL2NvZGVyeC5lbmNWYXJJbnRaEhAIFiIECBlADyoGCBQSAggIImkKSWdpdGh1Yi5jb20vYXBhY2hlL2JlYW0vc2Rrcy92Mi9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvY29kZXJ4LmRlY1ZhckludFoSHAgWIgQIGUADIgYIFBICCAgqBAgZQA8qBAgZQAE="}}} coders:{key:"c4" value:{spec:{urn:"beam:coder:length_prefix:v1"} component_coder_ids:"c3"}} coders:{key:"c5" value:{spec:{urn:"beam:coder:kv:v1"} component_coder_ids:"c4" component_coder_ids:"c2"}} coders:{key:"c6" value:{spec:{urn:"beam:coder:varint:v1"}}} coders:{key:"c7" value:{spec:{urn:"beam:coder:timer:v1"} component_coder_ids:"c4" component_coder_ids:"c1"}} environments:{key:"go" value:{urn:"beam:env:external:v1" payload:"\n\x11\n\x0flocalhost:37333" capabilities:"beam:protocol:progress_reporting:v1" capabilities:"beam:protocol:multi_core_bundle_processing:v1" capabilities:"beam:transform:sdf_truncate_sized_restrictions:v1" capabilities
:"beam:protocol:worker_status:v1" capabilities:"beam:protocol:monitoring_info_short_ids:v1" capabilities:"beam:version:sdk_base:go:apache/beam_go_sdk:2.59.0" capabilities:"beam:transform:to_string:v1" capabilities:"beam:protocol:data_sampling:v1" capabilities:"beam:protocol:sdk_consuming_received_data:v1" capabilities:"beam:coder:bytes:v1" capabilities:"beam:coder:bool:v1" capabilities:"beam:coder:varint:v1" capabilities:"beam:coder:double:v1" capabilities:"beam:coder:string_utf8:v1" capabilities:"beam:coder:length_prefix:v1" capabilities:"beam:coder:kv:v1" capabilities:"beam:coder:iterable:v1" capabilities:"beam:coder:state_backed_iterable:v1" capabilities:"beam:coder:windowed_value:v1" capabilities:"beam:coder:global_window:v1" capabilities:"beam:coder:interval_window:v1" capabilities:"beam:coder:row:v1" capabilities:"beam:coder:nullable:v1" capabilities:"beam:coder:timer:v1" dependencies:{type_urn:"beam:artifact:type:file:v1" role_urn:"beam:artifact:role:go_worker_binary:v1"}}}} root_transform_ids:"e1" roo
t_transform_ids:"e2" root_transform_ids:"e3" root_transform_ids:"s1" requirements:"beam:requirement:pardo:stateful:v1"
2024/09/27 22:22:36 Prepared job with id: job-001 and staging token: job-001
2024/09/27 22:22:36 Staged binary artifact with token: job-001
2024/09/27 22:22:36 Submitted job: job-001
2024/09/27 22:22:36 (): starting job-001[go-job-1-1727475756568453106]
2024/09/27 22:22:36 Job[job-001] state: STARTING
2024/09/27 22:22:36 (): running job-001[go-job-1-1727475756568453106]
2024/09/27 22:22:36 Job[job-001] state: RUNNING
2024/09/27 22:22:36 starting worker job-001[go-job-1-1727475756568453106]_go
2024/09/27 22:22:36 INFO ProcessElement <0, foo> source=.../poc/donothing.go:45 time=2024-09-27T22:22:36.580Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:22:36.584Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO OnTimer <0, foo>: count 0 source=.../poc/donothing.go:88 time=2024-09-27T22:22:36.585Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:22:36.586Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO OnTimer <0, foo>: count 1 source=.../poc/donothing.go:88 time=2024-09-27T22:22:36.586Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO data.Recv timers for unknown bundle response="timers:{instruction_id:\"inst004\" transform_id:\"e4\" timer_family_id:\"processingTime\" is_last:true}"
2024/09/27 22:22:36 stopping worker job-001[go-job-1-1727475756568453106]_go
2024/09/27 22:22:36 INFO (): pipeline completed job-001[go-job-1-1727475756568453106] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO (): terminating job-001[go-job-1-1727475756568453106] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO Job[job-001] state: DONE source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/job.go:125 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
--- PASS: TestDoNothingTransform (0.02s)
PASS
ok .../poc 0.150s
If I understand correctly the issue is:
That does sound like an issue with Dataflow.
As for Prism, this does seem like an edge case even with the Real Time execution or not. But it's not related to the windowing / or not windowing technically. I'd recommend filling it as a separate issue, even though it uses the same-ish, pipeline.
Some notes:
Technically this seems to be around looping timers, along with ProcessingTime weirdness.
What happened?
OnTimer
for processing time on streaming dataflow runner has its state wiped after indeterminate number of retries when windowing applied on pcollection.I believe compatibility matrix for streaming dataflow says this should work.
I am adding an unbounded source to pipeline separate from this pardo to get dataflow to launch pipeline as streaming.
using
github.com/apache/beam/sdks/v2 v2.59.0
donothing.go
main.go
Dataflow Logs
ps: newest log at top
With windowing
Without windowing
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components