infinyon / fluvio-client-python

The Fluvio Python Client!
https://infinyon.github.io/fluvio-client-python/fluvio.html
Apache License 2.0
13 stars 12 forks source link

Expose `RecordMetadata` after `send` #389

Closed dariogoetz closed 6 months ago

dariogoetz commented 6 months ago

I have a use-case in which a fluvio producer needs to keep track of the offsets (and partitions) that it sent its messages to. To that end, I would like to expose the RecordMetadata struct to the python client.

If I understand correctly, the TopicProducer.send method (in rust) generates a ProduceOutput struct, which provides a wait method in the end yielding the desired RecordMetadata.

I see two general ways forward to expose such functionality in the python client:

  1. Introduce both a ProduceOutput and a RecordMetadata to the python world, effectively mirroring the rust variants. This has a weird caveat, though: The ProduceOutput.wait method consumes self, inhibiting any further use of the ProduceOutput object. If a ProduceOutput would also live in the python world, such a "consumption" is not possible to have (if I understand PyO3 correctly). This could be worked around by wrapping the "inner" rust-version of the ProduceOutput in a Cell
    #[pyclass]
    pub struct ProduceOutput {
    inner: Cell<NativeProduceOutput>,
    }

    and then using take in the corresponding wait method:

    #[pymethods]
    impl ProduceOutput {
    fn wait(&self) -> Result<RecordMetadata, FluvioError> {
        let inner = self.inner.take();
        Ok(run_block_on(inner.wait()).map_err(error_to_py_err)?)
    }
    *
  2. Another option would be to not expose ProduceOutput and directly provide some method send_and_wait_for_metadata (a proper name would need to be found) that calls wait on the result itself and then returns a RecordMetadata object.

Do you have a good idea, how to address this issue?

I would be willing to add a PR for either of the approaches (or another one, if you have an idea).

digikata commented 6 months ago

I'm a little late to this reply given the WIP pr, but I was studying some PyO3 myself :). In this case, not 100% sure, but I'm wondering if defining an IntoPy trait for ProduceOutput IntoPy<RecordMeta> would work out. Not sure if that works with the async ProduceOutput wait().

It seems like these sections discussing IntoPy seem interesting for this case:

https://pyo3.rs/main/doc/pyo3/conversion/trait.intopy#conversion-to-a-python-object

https://pyo3.rs/main/doc/pyo3/conversion/trait.intopy#dynamic-conversion-into-python-objects