Closed rustalchemy closed 3 months ago
Thanks for detailed report.
I tried to reproduce your issue with simpler code, at it works just fine for me:
#[tokio::test]
async fn user_microservice() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();
let service_name = "user";
let service = client
.service_builder()
.start(service_name, "1.0.0")
.await
.unwrap();
let group = service.group(service_name);
let mut endpoint = group.endpoint("find").await.unwrap();
tokio::task::spawn(async move {
while let Some(request) = endpoint.next().await {
request.respond(Ok("ok".into())).await.unwrap();
}
});
let response = client.request("user.find", "hey".into()).await.unwrap();
assert_eq!(response.payload, bytes::Bytes::from("ok"));
println!("got respose {:?}", response.subject);
}
You do not have to sleep. The subscription is created when endpoint is created. I wonder if the edpoint is maybe dropped somewhere, which would cause the subsription to go away.
Thanks for the response, your ending gave me some insights about my service being dropped or non "awaited" in some form and how I was spawning the threads so I wrote a simpler implementation from what I was trying to do and it worked, updated it in my code and it finally found a responder, so I guess the issue wasn't with you guys, but instead in my skill issue :laughing:.
I'm going to include the code here once is a solution from what I was proposing:
use async_nats::service::ServiceExt;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let nats_url =
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
let client = async_nats::connect(nats_url).await?;
let group_name = "crew";
let service = client
.service_builder()
.description("A multithreaded microservices")
.start(group_name, "0.0.1")
.await?;
let g = service.group(group_name);
let endpoints = vec!["find", "update", "delete", "create"];
let mut threads = Vec::new();
for route in endpoints.clone() {
let mut endpoint = g.endpoint(route).await?;
let join = tokio::spawn(async move {
while let Some(request) = endpoint.next().await {
request.respond(Ok(format!("Hello from route: {}", route).into())).await.unwrap();
}
});
threads.push(join);
}
for route in endpoints{
let message = client.request(format!("{}.{}", group_name, route), "".into()).await?;
println!("{}", std::str::from_utf8(&message.payload).unwrap())
}
futures::future::join_all(threads).await;
Ok(())
}
Running this code will also allow you to access the services through nats cli, such as:
nats req crew.find ""
which should returns:
21:12:01 Sending request on "crew.find"
21:12:01 Received with rtt 1.189096ms
Hello from route: find
Thanks for the time and patience for reading and going through this issue.
Observed behavior
I've been getting a weird result within my services using it as it's described in the nats-rust-examples of the service framework.
My code, at least in theory, does every step that the example does but in a more dynamic way to handle a
Vec<String>
of routes defined by myMicroServiceRouter
and processing the data using therouter
function and you can check it out the gist here.The service is started, all the endpoints seems to be created but for some reason when I try to make a request I simply get no responders, the following flow is used to check if everything is created and being handled:
From running the command:
nats micro stats user --trace
Ok, so it seems that I got some endpoints. Let's check now which are the endpoints and it's stats using:
nats micro info user --trace
Ok, so in theory the processes are running and ready to be requested but when I execute the request using
nats request user.find someone
I've already checked for parallelism problems of one thread might be blocking others and the message isn't getting through but it doesn't seem to be the case once I'm also spawning threads just like the example shows, and checked that indeed each thread is running and waiting for messages.
Expected behavior
I'm new to using NATS specially the services-framework, so I don't know if I'm doing the requests wrongly, passing a different subject or because is a service I should use it in another way, though, if I run the example using the same way of requesting data it can at least find a responder and panics the example once the data passed is corrupt.
Based on the example and adding
sleep(Duration::from_secs(60)).await;
in line 51 so I got some time to send requests, if I run the code, and send a request:nats request minmax.min max --trace
I get:Which indicates that it found a responder and my running application panics.
Server and client version
nats-server --version
latest from dockernats --version
0.1.4Host environment
Everything is running in a docker container using docker compose which includes the nats server, the application, the example and I'm running the cli in a Ubuntu.
Steps to reproduce
No response