geofmureithi / apalis

Simple, extensible multithreaded background job and message processing library for Rust
https://crates.io/crates/apalis
MIT License
523 stars 50 forks source link

BoxDynError in worker fn #470

Open boingrach opened 2 days ago

boingrach commented 2 days ago

Hi, I'm using boxed error in worker function.

example ```rust use apalis::prelude::*; use apalis_sql::sqlite::SqliteStorage; use sqlx::SqlitePool; #[tokio::main] async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "debug,sqlx::query=info"); tracing_subscriber::fmt::init(); let pool = SqlitePool::connect("sqlite::memory:").await?; // Do migrations: Mainly for "sqlite::memory:" SqliteStorage::setup(&pool) .await .expect("unable to run migrations for sqlite"); let storage: SqliteStorage = SqliteStorage::new(pool.clone()); Monitor::new() .register({ WorkerBuilder::new("tasty-mango") .backend(storage) .build_fn(do_job) }) .run() .await?; Ok(()) } async fn do_job(job: String) -> Result<(), apalis::prelude::BoxDynError> { tracing::info!("job: {job}"); Ok(()) } #[derive(Debug)] enum CustomError {} impl std::error::Error for CustomError {} impl std::fmt::Display for CustomError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { todo!() } } ```

But i'm getting errors

errors ``` error[E0277]: the trait bound `ServiceFn impl Future>> {try_notify}, std::string::String, SqlContext, _>: tower_service::Service>` is not satisfied --> examples/sqlite/src/main.rs:25:18 | 25 | .build_fn(try_notify) | ^^^^^^^^ unsatisfied trait bound | = help: the trait `tower_service::Service>` is not implemented for `ServiceFn ... {try_notify}, ..., ..., ...>` = help: the following other types implement trait `tower_service::Service`: ServiceFn ServiceFn ServiceFn ServiceFn ServiceFn ServiceFn ServiceFn ServiceFn and 9 others = note: required for `WorkerBuilder, Identity, ...>` to implement `apalis::prelude::WorkerFactory impl Future>> {try_notify}, std::string::String, SqlContext, _>>` = note: the full name for the type has been written to '/tmp/apalis/target/debug/deps/sqlite_example-de103ae0c3a1493e.long-type-8084250147342458497.txt' = note: consider using `--verbose` to print the full type name to the console ```

Replacing boxed error with concrete enum fixes errors

async fn do_job(job: String) -> Result<(), CustomError> {
    tracing::info!("job: {job}");
    Ok(())
}

#[derive(Debug)]
enum CustomError {}
impl std::error::Error for CustomError {}
impl std::fmt::Display for CustomError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        todo!()
    }
}

apalis v0.6.0

geofmureithi commented 2 days ago

Is there a reason for using BoxDynError?

I recommend using the concrete Error type provided in apalis like in email_service example or define your error type.

I will investigate a little more.

boingrach commented 2 days ago

A lot of my services have tower layers applied. Some layers convert error into Box<dyn Error+...>

example ```rust use apalis::prelude::*; use apalis_sql::{context::SqlContext, sqlite::SqliteStorage}; use futures::future::BoxFuture; use sqlx::SqlitePool; use tower::{Layer, Service}; #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let pool = SqlitePool::connect("sqlite::memory:").await?; // Do migrations: Mainly for "sqlite::memory:" SqliteStorage::setup(&pool) .await .expect("unable to run migrations for sqlite"); let storage: SqliteStorage = SqliteStorage::new(pool.clone()); Monitor::new() .register({ WorkerBuilder::new("tasty-mango") .layer(DynLayer) .backend(storage) .build_fn(do_job) }) .run() .await?; Ok(()) } async fn do_job(job: String) -> Result<(), CustomError> { tracing::info!("job: {job}"); Ok(()) } #[derive(Debug)] enum CustomError {} impl std::error::Error for CustomError {} impl std::fmt::Display for CustomError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { todo!() } } #[derive(Debug)] struct DynLayerService { inner: S, } impl Service for DynLayerService where S: Service, { type Response = S::Response; type Error = Box; type Future = BoxFuture<'static, Result>; fn poll_ready( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { todo!() } fn call(&mut self, req: R) -> Self::Future { todo!() } } struct DynLayer; impl Layer for DynLayer { type Service = DynLayerService; fn layer(&self, inner: S) -> Self::Service { DynLayerService { inner } } } ```

this gives a possible hint why original build_fn is not allowed

error ```rust error[E0277]: the size for values of type `(dyn std::error::Error + std::marker::Send + Sync + 'static)` cannot be known at compilation time --> src/main.rs:21:10 | 21 | .register({ | ^^^^^^^^ doesn't have a size known at compile-time | = help: the trait `Sized` is not implemented for `(dyn std::error::Error + std::marker::Send + Sync + 'static)` = help: the trait `tower::Layer` is implemented for `apalis_core::layers::AckLayer` = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `std::error::Error` = note: required for `apalis_core::layers::AckLayer, std::string::String, SqlContext, ()>` to implement `tower::Layer impl futures::Future> {do_job}, std::string::String, SqlContext, ()>>>` ```
boingrach commented 2 days ago

Some services do not need to pass real error, basic info is enough so boxed error suites well

geofmureithi commented 2 days ago

Oh I see what you are looking for: Its the ErrorHandlerLayer. This layer should be the top most layer. Could you check if that fixes your issue?

boingrach commented 1 day ago

kinda ErrorHandlerLayer works only for manual Service implementation

example ```rust use apalis::{layers::ErrorHandlingLayer, prelude::*}; use apalis_sql::{context::SqlContext, sqlite::SqliteStorage}; use futures::future::BoxFuture; use sqlx::SqlitePool; use tower::Service; #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt().init(); let pool = SqlitePool::connect("sqlite::memory:").await?; SqliteStorage::setup(&pool) .await .expect("unable to run migrations for sqlite"); let storage: SqliteStorage = SqliteStorage::new(pool.clone()); let service_worker = WorkerBuilder::new("do_job") .layer(ErrorHandlingLayer::new()) .backend(storage.clone()) // fixed with layer .build(ServiceWithBoxedError); Monitor::new().register(service_worker).run().await?; let fn_worker = WorkerBuilder::new("do_job") .layer(ErrorHandlingLayer::new()) .backend(storage.clone()) // errors .build_fn(do_job); Monitor::new().register(fn_worker).run().await?; Ok(()) } async fn do_job(job: String) -> Result<(), Box> { tracing::info!("job: {job}"); Ok(()) } struct ServiceWithBoxedError; impl Service> for ServiceWithBoxedError { type Response = (); type Error = Box; type Future = BoxFuture<'static, Result>; fn poll_ready( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { todo!() } fn call(&mut self, req: Request) -> Self::Future { todo!() } } ```

the fn case fails with errors

errors ``` error[E0277]: the trait bound `apalis::prelude::ServiceFn impl futures::Future>> {do_job}, std::string::String, SqlContext, _>: Service>` is not satisfied ```
geofmureithi commented 1 day ago

I have replicated your issue and ack that it is because of what constraints that ServiceFn has. Currently, there are several workarounds. I will spend some time on this within the coming days.

geofmureithi commented 1 day ago

A lot of my services have tower layers applied. Some layers convert error into Box<dyn Error+...>

For this, the ErrorHandlingLayer should work.