mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

feat(akkaPekko): Retry mechanism for ParquetPartitioningFlow #351

Open utkuaydn opened 5 months ago

utkuaydn commented 5 months ago

Hi,

In the akkaPekko module, I noticed that there is no handling mechanism in ParquetPartitioningFlow when calling write from the hadoop ParquetWriter which might throw an IOException. I thought it would also be neat to have a retrial mechanism in place for failed records so I created #350. I think this is something that would be nice to have, let me know if you agree.

mjakubowski84 commented 5 months ago

Hi! Sorry for the late response. Could you please explain how this mechanism would be different from using Akka's RetryFlow as a wrapper around ParquetPartitioningFlow?

utkuaydn commented 5 months ago

Hi! So, correct me if I'm wrong, but the problem with using RetryFlow as a wrapper would be that it requires a decisionRetry function where it has the type (In, Out) => Option[In] and ParquetPartitioningFlow when using ParquetStreams.viaParquet.of[T]....write(...) will have In: T, Out: T. At a glance, you would think after a potential exception is thrown from the Hadoop write function, the written counter would not increment and modifiedPartitions would not update, which is the case. But, if you look at onPush you'll see that it emits the message exactly as it was received (with the writing steps in between). Meaning, that the element basically passes through the flow without any modification, so there is no way to decide after the flow if the element was actually written. With the change I'm proposing it would be possible to catch these exceptions and actually retry writing based on the configuration.

mjakubowski84 commented 4 months ago

I am sorry, I meant the RestartFlow, which restarts the flow on error (or RestartSource/RestartSink)

Then, on the restart, the flow is rebuilt, and all connections to Hadoop are reopened. I have Parquet streams running for a couple of years, and honestly, I am not experiencing IO issues. Therefore, I can't tell if reattempting to write a record on an IO error — without explicitly reestablishing a connection — is a sensible approach. Assuming that the underlying Hadoop driver can handle automatic reconnection, this might be okay. However, IMHO, a good practice is to recreate a flow and reconnect to fix the IO problem (or fail miserably if the reconnection is no longer possible).

You might have more experience with such IO errors. What do you say?I am sorry, I meant the RestartFlow, which restarts the flow on error (or RestartSource/RestartSink)

Then, on the restart, the flow is rebuilt, and all connections to Hadoop are reopened. I have Parquet streams running for a couple of years, and honestly, I am not experiencing IO issues. Therefore, I can't tell if reattempting to write a record on an IO error — without explicitly reestablishing a connection — is a sensible approach. Assuming that the underlying Hadoop driver can handle automatic reconnection, this might be okay. However, IMHO, a good practice is to recreate a flow and reconnect to fix the IO problem (or fail miserably if the reconnection is no longer possible).

You might have more experience with such IO errors. What do you say?

utkuaydn commented 4 months ago

Firstly, I think if something can throw an exception it's always better to assume it definitely will 😄. For example, we can never be sure if the bucket we are writing to just decides to shut itself down only to come back online after a minute or two.

So, I had to do some digging on RestartFlow, and found:

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.

Using RestartFlow can cause elements to be discarded which defeats the purpose of having this function in the first place. I believe this would be useful when, for example, an exception that is impossible to handle is thrown in the Flow, so the Flow has to be restarted even though it means compromising completeness.

Thus, I think it's still best if failed writes and exceptions are handled within ParquetPartitioningFlow. What you said about re-opening the writer does make sense so I'll refactor the PR to accommodate that and think a bit more about how to handle this case. Let me know what you think :)