GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

TextIO.writeCustomType cannot be used with DynamicDestinations.getDestination #619

Closed janhicken closed 6 years ago

janhicken commented 6 years ago

Hi,

I'm using the Dataflow Java SDK 2.2.0 in order to write data to GCS. I need to write data into different files grouped by some key. To do so, I implemented my own FileBasedSink.DynamicDestinations class, which determines the destination file with getDestination(String).

However, at that point, I already stringified my POJO class, which makes retrieving the key to group by quite difficult, I'd like to just call myPojo.getGroup() or so. At that point, I encountered TextIO.writeCustomType, which makes my POJO type available in the implementation of DynamicDestinations. I can now implement my Stringification of the record in formatRecord(UserT).

When settings a DynamicDestinations object to the TypedWrite, the corresponding method TypedWrite.to<NewDestinationT>(DynamicDestinations) expected the DynamicDestinations object to still use Void as destination type, which is the default when creating a TypedWrite via TextIO.writeCustomType. I think the corresponding method signature should be

public <NewDestinationT> TypedWrite<UserT, NewDestinationT> to(
        DynamicDestinations<UserT, NewDestinationT, String> dynamicDestinations)

instead of

public <NewDestinationT> TypedWrite<UserT, NewDestinationT> to(
        DynamicDestinations<UserT, DestinationT, String> dynamicDestinations)

Kind regards, Jan

MartinSahlen commented 6 years ago

I am also struggling with the exact same issue. It seems like something is wrong. when using avro dynamic destination (for instance), this is not a problem.

MartinSahlen commented 6 years ago

have a look here: https://github.com/apache/beam/pull/4319

MartinSahlen commented 6 years ago

It is definitely an error: https://stackoverflow.com/a/47946587/3715815

janhicken commented 6 years ago

Great, that PR will fix the issue, I'm closing this one then.