delta-io / kafka-delta-ingest

A highly efficient daemon for streaming data from Kafka into Delta Lake
Apache License 2.0
337 stars 71 forks source link

error: future cannot be sent between threads safely -- dead_letters.rs and writer.rs #167

Closed clairewood closed 5 months ago

clairewood commented 5 months ago

I cloned the project and am trying to run it locally, but I'm getting this error when I try to compile:

error: future cannot be sent between threads safely
   --> src\dead_letters.rs:283:43
    |
283 |       ) -> Result<(), DeadLetterQueueError> {
    |  ___________________________________________^
284 | |         let values: Result<Vec<Value>, _> = dead_letters
285 | |             .iter()
286 | |             .map(|dl| {
...   |
313 | |         Ok(())
314 | |     }
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn PageWriter + 'static)`
note: future is not `Send` as this value is used across an await
   --> src\writer.rs:401:26
    |
395 |         for (key, values) in self.divide_by_partition_values(values)? {
    |                              ---- has type `&writer::DataWriter` which is not `Send`
...
401 |                         .await,
    |                          ^^^^^ await occurs here, with `self` maybe used later
...
417 |         }
    |         - `self` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src\writer.rs:395:30
    |
395 |         for (key, values) in self.divide_by_partition_values(values)? {
    |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: required for the cast from `Pin<Box<[async block@src\dead_letters.rs:283:43: 314:6]>>` to `Pin<Box<(dyn futures::Future<Output = Result<(), dead_letters::DeadLetterQueueError>> + std::marker::Send + 'async_trait)>>`

I get that error about 10 times, with the only variation being what other types std::marker::Sync is not implemented for.

The other types it's not implemented for:

I've been trying to fix it myself but I'm not making any progress. If anyone else knows how to fix this, I would really appreciate the help!

To reproduce this error, clone kafka-delta-ingest and run cargo build --release

I'm running KDI locally on Windows. To get it to run on Windows I replace the rdkafka dependency with the dependencies below, but otherwise did not make any changes to my clone of KDI.

openssl = { version = "0.10.45", features = ["vendored"] }
rdkafka = { version = "0.28", features = ["cmake-build"] }
clairewood commented 5 months ago

Apparently my version of Rust was too old to get along with one of the updated crates. I updated it and now it compiles again. Sorry for the false alarm!