infinyon / fluvio

Lean and mean distributed stream processing system written in rust and web assembly. Alternative to Kafka + Flink in one.
https://www.fluvio.io/
Apache License 2.0
3.88k stars 491 forks source link

[Bug]: fluvio produce is not batching when using interactive mode and `--delivery-semantic at-least-once` #3143

Open morenol opened 1 year ago

TanNgocDo commented 1 year ago

Hi @morenol , I'm looking into this with fluvio cli. I saw that there is batch created with at-least-once, and multiple backoff after that:

TRACE send{topic=greetings}: fluvio::producer::accumulator: Batch is full. Creating a new batch for partition partition_id=0
2023-05-16T05:09:09.716210Z DEBUG run: fluvio::producer::partition_producer: new batch event
2023-05-16T05:09:09.732963Z TRACE run:send_receive_with_retry{retries=Take { iter: ExponentialBackoff(ExponentialBackoff { base_millis: 20, current_millis: 20, max_delay: Some(200s) }), n: 4 }}:send_and_receive{self=MultiplexerSocket 10}:send_request{self=fd(10) req=RequestMessage { header: RequestHeader { api_key: 0, api_version: 8, correlation_id: 1, client_id: "FLUVIO_CLI" }, request: ProduceRequest { transactional_id: None, isolation: ReadUncommitted, timeout: 1.5s, topics: [TopicProduceData { name: "greetings", partitions: [PartitionProduceData { partition_index: 0, records: RecordSet { batches: [Batch { base_offset: 0, batch_len: 14, header: BatchHeader { partition_leader_epoch: -1, magic: 2, crc: 0, attributes: 0, last_offset_delta: 0, first_timestamp: 1684213749715, max_time_stamp: 1684213749716, producer_id: -1, producer_epoch: -1, first_sequence: -1 }, records: RawRecords(b"\0\0\0\x01\x12\0\x02\0\0\x06Tan\0") }] } }], data: PhantomData<fluvio_protocol::record::data::RecordSet<fluvio_protocol::record::batch::RawRecords>> }], smartmodules: [], data: PhantomData<fluvio_protocol::record::data::RecordSet<fluvio_protocol::record::batch::RawRecords>> } }}: fluvio_protocol::codec: size="encoding data with write size"

Not sure if I got it correctly, or may you give a more detail description? Thanks

morenol commented 1 year ago

@TanNgocDo from what I remember. When we use atLeastOnce after every producer push on cli, we wait for the record send output. Therefore we wait for the record be successfully produced before adding more records to the batch

https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio-cli/src/client/produce/mod.rs#L380

So, I think that this is mostly an issue on the CLI impl

TanNgocDo commented 1 year ago

Thanks @morenol , it seems to be relating to this: https://github.com/infinyon/fluvio/issues/2808 ? the throughput of atleastone is not good comparing with atmostonce. Of course, will need to test more.

morenol commented 1 year ago

For sure, it should be related to that

TanNgocDo commented 1 year ago

Looking on that 👍

TanNgocDo commented 1 year ago

@morenol , as I debug, the wait() method will go to https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/record.rs#L135 Which is blocked and will be notified by : https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/partition_producer.rs#L247 It means this wait() method can be only notified by flush(): https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/partition_producer.rs#L181 And to trigger flush(), the linger_sleep must be surpassed: https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/partition_producer.rs#L143 So whatever the value of linger_time we set, we need to wait till the interval is ended => linger_time is not useful here. To sum up: with current implementation: wait() -> block ->linger_time ends -> flush the batch with only one record -> get response from socket->end of wait(). I'm not sure if this is the right behavior here: at-least-one: should we only send one record per batch = no batching ?, at least the linger_time is not useful with the current implementation of at-least-once(the bigger linger is ,the longer blocking time to send out the batch with one record). So do we need to update the documentation here: https://www.fluvio.io/docs/concepts/delivery-semantics/ ? If we fix it by batching multiple records in at-least-once: should we change from "at-least-once delivery means that for each record handed to the producer potentially multiple attempts are made at delivering it, such that at least one succeeds" to "at-least-once delivery means that for each batch ..." Or keep the current implementation and disable linger_time (for al-least-once).... ?

morenol commented 1 year ago

@TanNgocDo I think that the ideal solution should be to fix it by batching, regarding the update of the documentation that seems accurate to me though maybe the fact that for every record it is retried could already imply that. Seems to me that the fact that if it is done per batch or record is an implementation detail and is not needed for the general documentation

TanNgocDo commented 1 year ago

Thanks @morenol, Wait() is locking point that prevents other record added to a batch.Shoud we do one of these option:? 1) remove wait() : this will prevent us from getting future meta data... Or 2) Store of list output and wait() for all output(similar with reading from file -f) Or 3) Other solution... ?

morenol commented 1 year ago

I think that option 2 should be ok

TanNgocDo commented 1 year ago

I have just added a small draft change(to make sure in the right direction) for stdin(need to refactor to reduce redundant code), and in interactive mode, it seems that we don't know how it ends input. Maybe a special key ? or keep the list number of record(add more flag to specify the number of record ?) or control the batchsize to maintain list of records for a batch ?