Open hlteoh37 opened 5 months ago
Adding dump of replication Flink code here:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink.example</groupId>
<artifactId>beam</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Apache Flink Beam Application</name>
<packaging>jar</packaging>
<properties>
<flink.version>1.15.2</flink.version>
<logback.version>1.4.14</logback.version>
<main-class>org.apache.flink.example.BeamApplication</main-class>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.15</artifactId>
<version>2.56.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
<version>2.56.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-metrics-core</exclude>
<exclude>org.apache.flink:flink-shaded-*</exclude>
<exclude>org.apache.flink:flink-table-*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${main-class}</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Java class
package org.apache.flink.example;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import lombok.Data;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.joda.time.Duration;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
public class BeamApplication {
@Data
@JsonPropertyOrder({"timestamp", "location", "quantity"})
public static final class Event {
private Instant timestamp;
private String location;
private long quantity;
}
@WithSpan
public static void main(final String... args) throws Exception {
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setAttachedMode(false);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("KDS Source", KinesisIO.read()
.withStreamName("ExampleInputStream")
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
.withInitialTimestampInStream(org.joda.time.Instant.now().minus(Duration.standardMinutes(30)))
.withClientConfiguration(ClientConfiguration.builder()
.region(Region.US_EAST_1)
.build()))
.apply(ParDo.of(new DoFn<KinesisRecord, String>() {
@ProcessElement
public void processElement(@Element KinesisRecord record, OutputReceiver<String> out) {
System.out.println(record.toString());
out.output(record.toString());
}
}))
.apply("KDS Sink", KinesisIO.<String>write()
.withStreamName("ExampleOutputStream")
.withClientConfiguration(ClientConfiguration.builder()
.region(Region.US_EAST_1)
.build())
.withSerializer((SerializableFunction<String, byte[]>) input -> input.getBytes(StandardCharsets.UTF_8))
.withPartitioner(KinesisPartitioner.explicitRandomPartitioner(1))
);
pipeline.run();
}
}
@je-ik is this the same issue as this https://github.com/apache/beam/issues/30903
I noticed you fixed it and the problem statement seems to be similar, but please let me know if this is something different as I am getting duplicated data on 2.56 when restoring from a flink save point
I suppose this is (similar, but) different issue, probably caused by the same underlying bug. #30903 fixed Impulse only. Does using --experiments=beam_fn_api
fix the issue?
@je-ik Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?
I am noticing actually a lot of back pressure using this approach despite downstream operators having low CPU usage. Is the fix to the root cause relatively straight forward in which case I can implement it in a forked version of the repo? or is it more involved?
I don't know the root cause, it seems that Flink does not send the snapshot state after restore from savepoint. I observed this on the Impulse (I suspected that it affects only bounded sources running in unbounded mode, but it seems it is not the case). It might be a Beam bug or a Flink bug.
Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?
The flag turns on different expansion for Read transform - it uses splittable DoFn (SDF), which uses Impulse which was fixed earlier. Performance should be similar to classical Read.
Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.
I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?
it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)
Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.
Can you please provide a minimal example and setup to reproduce the behavior?
I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?
You can drain the Pipeline, see https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#terminating-a-job
it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)
This is related to how Flink computes target splits. It is affected by maximal parallelism (which is computed automatically, if not specified). You can try increasing it via --maxParallelism=32768
(32768 is maximal value), this could make the assignment more balanced.
Thanks for the suggestions, will give them a try. I believe the first comment of the ticket provides a simple pipeline that exhibits this behavior on the flink runner but if that doesn’t work, happy to provide another. The example also submits the job in detached mode which may be related, although have seen similar behavior without it. Appreciate your help looking into this, if there’s anything I can assist with, please let me know
Just to mimic the local setup I used:
I ran flink/start-cluster.sh
used the flink run command with the -d flag
and then stopped the job with a savepoint ./flink/bin/flink stop -p flink/savepoints cf78a44e6b10ab7062d3c02bb7d4e052
and then restarted using run with the savepoint path.
When doing this, I looked inside the task manager logs and searched for Starting getIterator request
and saw 6 logs for the same timestamp that my app restarted. 3 at sequence number and 3 at latest. I am not sure why the latest ones are showing up and didnt see anything in the source code that would cause this.
I also switched to kafka and noticed the same behavior so it seems to be related to the runner. I was unable to fix the performance issues with beam_fn_api and notice the backpressure was causing my data to come in waves. Looking at a cpu chart, it was very cyclic with peaks of 99% cpu and troughs of 8% cpu leading me to believe that this pipeline option was causing some sort of build up and then a rush of data causing the cpu to spike.
I can make do with kafka offset commits for now, but if there are any pointers on how to fix this in the beam source code, id be happy to take a look and even submit a PR to be included in version 2.57. Although still hoping the issue is somewhere on my end that can be fixed fairly easily
Hi @akashk99, just to be sure, do you observe the same behavior when not using flink run
,, but running the job as "standard" Java app (java -cp <jar> class
) and passing the Flink configuration using Beam command-line args (--runner=flink --flinkMaster=... --savepointPath=...
)?
Hi @je-ik , was just able to reproduce the issue by manually running the jar file.
Started the job by running java -cp <jar> <class> --runner=flink --flinkMaster=...
and then used flink stop
with savepoint to take a savepoint. afterwards, I reran, java -cp <jar> <class> --runner=flink --flinkMaster=... --savepointPath=...
with the snapshot I just took. After observing the task manager logs, I see:
Terminal Log:
Jun 06, 2024 12:55:08 PM org.apache.flink.client.program.rest.RestClusterClient lambda$null$6
INFO: Successfully submitted job <jobName> (6651a9570e4c9d1df81539b07e6e91ce) to 'http://localhost:8081'.
Task Manager Logs:
2024-06-06 12:55:11,979 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=<streamName>, ShardId=shardId-000000000012, ShardIteratorType=AT_SEQUENCE_NUMBER,
2024-06-06 12:55:13,279 INFO org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient [] - Starting getIterator request GetShardIteratorRequest(StreamName=<streamName>, ShardId=shardId-000000000012, ShardIteratorType=LATEST)
this was a few seconds after the job was submitted. I trimmed the output, but these two logs were there for all of my shards.
Hi, we are seeing the same behavior on our pipeline.
Logs from a Task Manager
2024-07-08 14:29:13,867 WARN org.apache.beam.sdk.coders.SerializableCoder [] - Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations
2024-07-08 14:29:13,867 INFO org.apache.beam.sdk.io.kinesis.KinesisSource [] - Creating new reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream <streamName>, shard shardId-000000002111: 49653574367537750625153914864609254626539224692753990642]
2024-07-08 14:29:13,945 INFO org.apache.beam.sdk.io.kinesis.KinesisReader [] - Starting reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream <streamName>, shard shardId-000000002111: 49653574367537750625153914864609254626539224692753990642]
2024-07-08 14:29:14,122 INFO org.apache.beam.sdk.io.kinesis.ShardReadersPool [] - Starting to read <streamName> stream from [shardId-000000002111] shards
2024-07-08 14:29:14,772 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=2, beamSource=org.apache.beam.sdk.io.kinesis.KinesisSource@a09dd60, splitState.isNull=true, checkpointMark=null}]
2024-07-08 14:29:14,773 INFO org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Received NoMoreSplits signal from enumerator.
2024-07-08 14:29:14,775 INFO org.apache.beam.sdk.io.kinesis.KinesisSource [] - Creating new reader using [Checkpoint LATEST for stream <streamName>, shard shardId-000000002111: null]
2024-07-08 14:29:14,778 INFO org.apache.beam.sdk.io.kinesis.KinesisReader [] - Starting reader using [Checkpoint LATEST for stream <streamName>, shard shardId-000000002111: null]
2024-07-08 14:29:14,830 INFO org.apache.beam.sdk.io.kinesis.ShardReadersPool [] - Starting to read <streamName> stream from [shardId-000000002111] shards
In our case we are using:
beam-sdks-java-io-kinesis
)
What happened?
Bug description
Setup details:
beam-sdks-java-io-amazon-web-services2
)Bug details:
org.apache.beam.sdk.io.aws2.kinesis.KinesisReader
is assigned the same splits twice, once with snapshot state, and once without. This leads to duplicate data being processed.Replication steps:
Logs:
shardId-000000000000
toshardId-000000000003
are first initialized with checkpoint stateAFTER_SEQUENCE_NUMBER
(correct).AT_TIMESTAMP
(not correct).Issue Priority
Priority: 3 (minor)
Issue Components