line / decaton

High throughput asynchronous task processing on Apache Kafka
Apache License 2.0
336 stars 51 forks source link

Provide a way of specifying a partition in producing a task #159

Closed ta7uw closed 1 year ago

ta7uw commented 2 years ago

We can inject a custom org.apache.kafka.clients.producer.Partitioner by implementing it and passing it through DecatonClientBuilder#producerConfig.

However, A decaton client wraps a task set by our application in DecatonTaskRequest. So, to determine a partition based on a task in Partitioner, we should handle a DecatonTaskRequest (deserialize our defined task. after that proceed partition step) like the following.

public class CustomPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (value instanceof DecatonTaskRequest) {
            DecatonTaskRequest request = (DecatonTaskRequest) value;
            MyTask task = new ProtocolBuffersDeserializer<>(MyTask.parser())
                                           .deserialize(request.getSerializedTask().toByteArray());
            # determine partition
            .... 
            return result;
        }
        return super.partition(topic, key, keyBytes, value, valueBytes, cluster);
    }

The above seems redundant and Inefficient due to deserializing a serialized task.

Could we provide a way of specifying a partition in producing a task like the following API?

public class DecatonClientImpl<T> implements DecatonClient<T> {
   ....
  public CompletableFuture<PutTaskResult> put(String key, T task, int partition) {
  ...
}

The partition that is passed as the argument would be set to TaskMetadataProto and be used in an internal Partitioner that determines a partition base on it.

What do you think about this feature?

ocadaruma commented 2 years ago

Yeah, if we want to specify the partition based on its task value by custom partitioner, it needs such redundant works.

Providing an overload for specifying partition explicitly sounds make sense.

ta7uw commented 2 years ago

Thanks for your reply. I'll work on this feature.