apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: Side effect during the for loop execution in beam Pipeline #31014

Open giovanni-prestipino opened 5 months ago

giovanni-prestipino commented 5 months ago

What happened?

When using for loop, to execute PTransform, all loops seem to use the last value of an iterator.

import apache_beam as beam

def main():
    with beam.Pipeline() as p:
        p_f = p | beam.Create([1, 2, 3, 4, 5])

        for i in range(0, 3):
            p_f = p_f | f"{i}" >> beam.Map(lambda x: x + i)

        p_f | beam.Map(print)

if __name__ == "__main__":
    main()

As a result of the code, I expected to print

4
5
6
7
8

But I got:

7
8
9
10
11

Moreover, I found a workaround defining a new sum function:

import apache_beam as beam

def _sum(pcoll, i):
    return pcoll | f"{i}" >> beam.Map(lambda x: x + i)

def main():
    with beam.Pipeline() as p:
        p_f = p | beam.Create([1, 2, 3, 4, 5])

        for i in range(0, 3):
            p_f = _sum(p_f, i)

        p_f | beam.Map(print)

if __name__ == "__main__":
    main()

I am using the following versions:

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

giovanni-prestipino commented 5 months ago

Maybe the problem is that the loop parameter should be passed to the Map as a side input:


def main():
    with beam.Pipeline() as p:
        p_f = p | beam.Create([1, 2, 3, 4, 5])

        for i in range(0, 3):
            p_f = p_f | f"{i}" >> beam.Map(lambda x, k: x + k, k=i)

        p_f | beam.Map(print)

if __name__ == "__main__":
    main()
``
liferoad commented 5 months ago

For your first example, I think this is expected. the pipeline won't be executed after the entire pipeline is construct. After that, the variable i has the final value, which will be passed in for the final execution.