Closed rangadi closed 7 years ago
+R: @davorbonaci, @dhalperi, @tgroh
Update based on Thomas comment on chat. DirectPipelineRunner does not call getInitialSplits(). Rather than forcing single split through a special config, force it when it invoked from within KafkaIO itself.
I was not sure exactly what you meant.. I minimized the diff by reusing old code that handles the generic case where number of partitions and splits might not match. PTAL.
Updated after a clarification from Thomas. It makes sense. There is no special case for single split in generateInitialSplits(). createReader() creates single reader if there aren't any partitions assigned (as happens with direct runner).
Updated couple of javadoc comments as well.
[ERROR] src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java:[29,8] (imports) UnusedImports: Unused import: com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.
Otherwise LGTM
Thanks. Just pushed the fix for unused import. I will ping once travis-ci is happy.
@tgroh all the checks passed. Thanks for the review.
KafkaIO should return one split for each of partition.
This is the actual unit of parallelism for Kafka topic. desiredNumSplits that Dataflow passes to a custom source is very low when maxNumWorkers is set. It asks for just one split for each of the workers. This limits use of CPU cores on the workers essentially making autoscaling use more resources without improving performance.
This includes a hack to force single split in many unit tests since DirectPipelineRunner and InProcessPipelineRunner don't seem to read from more than one split.