fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.64k stars 281 forks source link

Async producer commit message inside Tokio Spawn #585

Open azhard4int opened 1 year ago

azhard4int commented 1 year ago

I am trying to commit a message using Stream Consumer. But, getting the problem that consumer does not live long enough. Tried using Arc Mutex as well, but it ends up with the same error.

My code is below:

let config: ClientConfig = create_consumer_kafka_config(brokers);
    let consumer: StreamConsumer = config.create().expect("Consumer creation failed");
    consumer
        .subscribe(&[&topic])
        .expect("Can't subscribe to specified topics");
    let consumer = Arc::new(consumer);

    // Create the outer pipeline on the message stream.
    let stream_processor = consumer.stream().take_until(shutdown_tripwire.map(|_| ())) // Stop consuming when the shutdown signal is received
    .try_for_each(|borrowed_message| {
        let bot_resolver = bot_resolver.clone();
        let geo_resolver = geo_resolver.clone();
        let handler = handler.clone();
        let sink = sink.clone();
        let processor_sender = processor_sender.clone();
        let consumer = consumer.clone();

        async move {

            // Process each message
            record_borrowed_message_receipt(&borrowed_message).await;
            // Borrowed messages can't outlive the consumer they are received from, so they need to
            // be owned in order to be sent to a separate thread.
            let owned_message = borrowed_message.detach();
            record_owned_message_receipt(&owned_message).await;
            let processing_task = tokio::spawn(async move{
                // let start_time = Instant::now(); // Start measuring time
                let payload_value = get_payload(owned_message);
                // let elapsed_time = start_time.elapsed(); // Calculate elapsed time
                // println!("Time taken: {:?}", elapsed_time); // Print the elapsed time
                match handler
                    .process_payload(&payload_value, &geo_resolver, &bot_resolver)
                    .await
                {
                    Ok(transformed_event) => {
                        // println!("Transformed event: {:?}", transformed_event);
                        // let elapsed_time = start_time.elapsed(); // Calculate elapsed time
                        // println!("Time taken: {:?}", elapsed_time); // Print the elapsed time
                        let event = sinks::EventTypes::Transformed(transformed_event.clone());
                        let sent = sink.send(event).await;
                        if let Err(e) = sent {
                            tracing::error!("Failed to send event to sink: {:?}", e);
                            // return Err(Box::new(EventSinkError("Failed to send event to sink")));
                        }
                        consumer.commit_message(&borrowed_message, CommitMode::Async).unwrap();
                    }
                    Err(e) => {
                        tracing::error!("Failed to process payload: {}", e);
                        // Send the payload with error to the failed topic.
                    }

                }
            });
            processor_sender.send(processing_task).await.expect("Failed to send processing task");
            Ok(())
        }
    });

The error I am seeing is this:

error[E0597]: `consumer` does not live long enough
   --> src/consumer.rs:181:28
    |
181 |     let stream_processor = consumer.stream().take_until(shutdown_tripwire.map(|_| ())) // Stop consuming when the shutdown signal is rece...
    |                            ^^^^^^^^^^^^^^^^^
    |                            |
    |                            borrowed value does not live long enough
    |                            argument requires that `consumer` is borrowed for `'static`

241 | }
    | - `consumer` dropped here while still borrowed

Any help would be appreciated.

mindreader commented 11 months ago

I'm sure you moved on from this by now, but the problem is that you spawn an 'async move' closure which tries to move to any variable it uses (one of which is consumer) into itself. What you need to do is move a copy of closure into the block.

You actually did that earlier, but you need to do it again, because there is a third context in which consumer needs to continue existing in until some indeterminate time in the future (processing thread).

It usually ends up looking like this:

tokio::spawn({
  let consumer = consumer.clone();
  async move {
    ...
   }
});
shenshouer commented 6 months ago

let arc_consumer = Arc::new(consumer); let consumer = arc_consumer.clone();