spring-rs / spring-rs

🍃spring-rs is a application framework written in rust inspired by java's spring-boot
https://spring-rs.github.io/
MIT License
498 stars 28 forks source link

spring stream redis消费不了 #72

Closed huangjie0515 closed 3 weeks ago

huangjie0515 commented 4 weeks ago

我使用spring-stream redis进行测试,我通过接口能够成功发送消息到redis中,且类型是stream,但没有消费,也没异常日志。 以下是我的代码和配置:

[dependencies]
spring = "0.2.2"
spring-web = "0.2.2"
spring-sqlx = { version= "0.2.1", features = ["mysql"] }
spring-redis = "0.2.1"
spring-job = "0.2.1"
tokio = "1"
spring-stream = { version = "0.2.1", features = ["redis", "json"] }
anyhow = "1.0.86"
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.125"
async-trait = "0.1"
tower = { version = "0.5.0", features = ["util", "timeout"] }
tower-http = "0.6.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
http = "1.1.0"
#[tokio::main]
async fn main() {
    App::new()
        .add_plugin(SqlxPlugin) // 添加插件
        .add_plugin(WebPlugin)
        .add_plugin(RedisPlugin)
        .add_plugin(StreamPlugin)
        .add_router(router())
        .add_plugin(JobPlugin)
        .add_consumer(consumers::redis_stream_consumer::consumers())
        .add_jobs(jobs::all_jobs::jobs())
        .run()
        .await
}
use serde::{Deserialize, Serialize};
use spring_stream::consumer::Consumers;
use spring_stream::extractor::{Json, StreamKey};
use spring_stream::handler::TypedConsumer;
use spring_stream::redis::AutoStreamReset;
use spring_stream::stream_listener;
use spring_stream::{redis::RedisConsumerOptions, StreamConfigurator};

#[derive(Debug, Serialize, Deserialize)]
pub struct Payload {
    success: bool,
    msg: String,
}

pub fn consumers() -> Consumers {
    Consumers::new().typed_consumer(listen_topic_do_something)
}

#[stream_listener(
    "topic",
    redis_consumer_options = fill_redis_consumer_options
)]
async fn listen_topic_do_something(topic: StreamKey, Json(payload): Json<Payload>) {
    println!("==============received msg from topic#{:?}: {:#?}", topic, payload);
}

fn fill_redis_consumer_options(opts: &mut RedisConsumerOptions) {
    opts.set_auto_stream_reset(AutoStreamReset::Earliest);
}
#[get("/send_msg")]
async fn send_msg(Component(producer): Component<Producer>) -> error::Result<impl IntoResponse> {
    let now = SystemTime::now();
    let json = json!({
        "success": true,
        "msg": format!("This message was sent at {:?}", now),
    });
    let resp = producer
        .send_json("topic", json)
        .await
        .context("send msg failed")?;

    let seq = resp.sequence();
    Ok(Json(json!({"seq":seq})))
}
warning: `spring-rs-demo` (bin "spring-rs-demo") generated 16 warnings (run `cargo fix --bin "spring-rs-demo"` to apply 7 suggestions)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.30s
     Running `target\debug\spring-rs-demo.exe`
2024-10-16T16:57:27  INFO spring::app: spring_web::WebPlugin plugin registered
2024-10-16T16:57:27  INFO spring::app: spring_redis::RedisPlugin plugin registered
2024-10-16T16:57:27  INFO spring::app: spring_job::JobPlugin plugin registered
2024-10-16T16:57:27  INFO spring_sqlx: sqlx connection success
2024-10-16T16:57:27  INFO spring::app: spring_sqlx::SqlxPlugin plugin registered
2024-10-16T16:57:27  INFO spring_stream: register scheduler for "["topic"]" stream consumer
2024-10-16T16:57:27  INFO spring::app: spring_stream::StreamPlugin plugin registered
huangjie0515 commented 4 weeks ago

除了这个问题外,我还想问 stream redis如果有密码,要怎么配置。redis没有密码的话,我已经连接成功了,但有密码的话,就一直提示"Backend error: RedisBackendErr: Extension error: NOAUTH: Authentication required."

holmofy commented 4 weeks ago

https://github.com/spring-rs/spring-rs/blob/50072d4f7d8979f3860beb1dd040ba8f4fd97275/spring-stream/src/config/redis.rs#L92-L93

# redis流配置
[stream.redis]
connect = { db=0,username="user",password="passwd" }
huangjie0515 commented 4 weeks ago

这个stream redis消费不了,服务端的redis,需要加什么配置吗,

huangjie0515 commented 4 weeks ago

你复现了这个问题了吗, 我使用的redis信息如下:

redis版本: 6.0.16
Os:Linux5.15.0-56-genericx86_64
操作系统: Ubuntu
holmofy commented 4 weeks ago

tokio 调度的问题,web 插件和 stream 插件都是无限循环阻塞等待新的请求。我在研究这玩意儿改怎么改

发自我的iPhone

在 2024年10月17日,上午9:31,huangjie0515 @.***> 写道:

 你复现了这个问题了吗, 我使用的redis信息如下:

reids版本: 6.0.16 Os:Linux5.15.0-56-genericx86_64

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.

holmofy commented 3 weeks ago

spring 0.2.6 已修复这个问题

https://github.com/spring-rs/spring-rs/actions/runs/11427197797