facebook / fbthrift

Facebook's branch of Apache Thrift, including a new C++ server.
Apache License 2.0
2.57k stars 608 forks source link

how to use tokio::spawn in rust client? #598

Open sternezsl opened 5 months ago

sternezsl commented 5 months ago

I would like to concurrently send requests to fbthrift server, so I created a client with the following code:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let client1 = conn.clone();

    let mut tasks = vec![];
    tasks.push(tokio::spawn(async move {
        let result = client1.method1(10).await;
        println!("result: {}", result.unwrap());
    }));

    let client2 = conn.clone();
    tasks.push(tokio::spawn(async move {
        let result = client2.method1(11).await;
        println!("result: {}", result.unwrap());
    }));
    for task in tasks {
        task.await?;
    }
    Ok(())
}

where the TestService::method1 is very simple, it just adds one to the received number

service TestService {
  i32 method1(1: i32 req);
}

the client subspended after receive the first request's result:

> cargo run -r
result: 11

but if I tweak the code with use futures::future::try_join_all

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let response = try_join_all(vec![conn.method1(10), conn.method1(11)]).await;
    println!("{response:?}");
    Ok(())
}

it works and I got the expected result:

> cargo run -r
Ok([11, 12])

Myy question is what is wrong with the tokio::spawn version?

slawlor commented 4 months ago

Hello and thank you for opening this issue!

A few questions to follow up with

  1. Do you have a complete example you can share?
  2. Can you try some of the other tokio primatives and see if they work? i.e. JoinSet or tokio::join!?
  3. With your "tweak" can you try to clone the connection for both queries and see if the issue reproduces? It may be that the clone of the connection is improper which might be causing the problem. Or it needs to be wrapped into an Arc in order to work. I have a guess that dropping the conn will close the tcp connection, or cloning it won't properly open multiple connections.

Thanks!

sternezsl commented 4 months ago

Thanks for your reply

Hello and thank you for opening this issue!

A few questions to follow up with

  1. Do you have a complete example you can share?

client source code

[package]
name = "fbthrift-client"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name = "client"
path = "src/main.rs"

[dependencies]
fbthrift = { path = "../fbthrift/thrift/lib/rust" }
fbthrift_demo_if = { path = "../fbthrift-demo" }
fbthrift_tcp = { path = "../rust-shed/shed/fbthrift_ext/tcp" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3.30"
anyhow = "1.0.75"

[build-dependencies]
thrift_compiler = { path = "../rust-shed/shed/thrift_compiler" }

[tokio::main]

async fn main() -> anyhow::Result<()> { let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?; let transport = TcpTransport::new(stream); let conn = ::new(CompactProtocol, transport); let client1 = conn.clone();

let mut tasks = vec![];
tasks.push(tokio::spawn(async move {
    let result = client1.method1(10).await;
    println!("result: {}", result.unwrap());
}));

let client2 = conn.clone();
tasks.push(tokio::spawn(async move {
    let result = client2.method1(11).await;
    println!("result: {}", result.unwrap());
}));
for task in tasks {
    task.await?;
}
Ok(())

}


## server source code
- demo.thrift
``` thrift 
```namespace cpp2 demo

service TestService {
  i32 method1(1: i32 req);
}

- DemoHandler.h
```c++
#pragma once

#include "gen-cpp2/TestService.h"
#include <folly/logging/xlog.h>

namespace demo {
namespace cpp2 {
class ExampleHandler : public TestServiceSvIf {
public:
  folly::SemiFuture<int> semifuture_method1(int req) override {
    XLOG(INFO) << "server: receive " << req;
    return folly::makeSemiFuture(req + 1);
  }
};
} // namespace cpp2
} // namespace demo

[lib] path = "src/thrift_lib.rs" test = false doctest = false

[[test]] name = "fbthrift_test" path = "tests/lib.rs"

[dependencies] anyhow = "1.0.71" async-trait = "0.1.71" codegen_includer_proc_macro = { path = "../rust-shed/shed/codegen_includer_proc_macro" } const-cstr = "0.3.0" fbthrift = { path = "../fbthrift/thrift/lib/rust" } fbthrift_demo_if__types = { package = "fbthrift_demo_if_types", path = "src/types" } futures = { version = "0.3.28", features = ["async-await", "compat"] } ref-cast = "1.0.18" thiserror = "1.0.43" tracing = "0.1.35" tracing-futures = { version = "0.2.5", features = ["futures-03"] }

[dev-dependencies] proptest = "1.0" serde_json = { version = "1.0.100", features = ["float_roundtrip", "unbounded_depth"] }

[build-dependencies] thrift_compiler = { path = "../rust-shed/shed/thrift_compiler" }

[features] default = ["thrift_library_unittests_disabled"] thrift_library_unittests_disabled = []


> 2. Can you try some of the other tokio primatives and see if they work? i.e. `JoinSet` or `tokio::join!`?

Yes, `tokio::join!` works

> 3. With your "tweak" can you try to clone the connection for both queries and see if the issue reproduces? It may be that the clone of the connection is improper which might be causing the problem. Or it needs to be wrapped into an `Arc` in order to work. I have a guess that dropping the `conn` will close the tcp connection, or cloning it won't properly open multiple connections.

After I cloned the connection for both queries, it works, too.

```rust
use fbthrift_demo_if::TestService;
use fbthrift_tcp::TcpTransport;
use fbthrift::CompactProtocol;
use futures::future::try_join_all;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let response = try_join_all(vec![conn.clone().method1(10), conn.clone().method1(11)]).await;
    println!("{response:?}");
    Ok(())
}

Thanks!

If necessary, I could mail both of the client and server to you. Thank you very much.

slawlor commented 4 months ago

Can you attach an archive of what you're trying to run here? I think I'm still missing some of the logic to setup your server and verify this. The client code looks potentially complete but I don't see how you're running the thrift server here.

sternezsl commented 4 months ago

Can you attach an archive of what you're trying to run here? I think I'm still missing some of the logic to setup your server and verify this. The client code looks potentially complete but I don't see how you're running the thrift server here.

sorry about that, I wrote the server with C++. code.tar.gz

slawlor commented 4 months ago
  • fbthrift git version: 5140b62e0c

Are you sure about this revision? Can you drop the full SHA hash here? I can't pull this revision (I'm trying to setup a repro of your problem now)

sternezsl commented 4 months ago
  • fbthrift git version: 5140b62e0c

Are you sure about this revision? Can you drop the full SHA hash here? I can't pull this revision (I'm trying to setup a repro of your problem now)

the above one is my local commit hash, I'm sorry for bothering you due to my carelessness.

commit 0fb315837c6cc5a823f759332f11d8a4885e826f (HEAD) Author: Open Source Bot generatedunixname499836121@fb.com Date: Sat Feb 24 09:34:25 2024 -0800

slawlor commented 4 months ago

Hello again, I just wanted to let you know I'm still working on setting up a repro sample. We've hit some hiccups building the thrift compiler, but we're still progressing. It may just take some time to investigate.

As a last option, can you try wrapping the original client in the tokio sample in an Arc and using that to make your requests? I have a suspicion where the problem is, but unfortunately am still trying to setup the repro to confirm it. So it would look something like

let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
let transport = TcpTransport::new(stream);
let conn = Arc::new(<dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport));
let client = conn.clone();
sternezsl commented 4 months ago

Thanks for you effort. But still not working.

# cargo run -r
     Running `target/release/client`
result: 11
^C
sternezsl commented 2 months ago

@slawlor AFAK, within Meta you use srserver/srclient framework, do you have any plan to open source these components?

yfeldblum commented 2 months ago

@slawlor AFAK, within Meta you use srserver/srclient framework, do you have any plan to open source these components?

Think of thrift itself as generic, general-purpose, not tied into meta-specific internal implementation details. And think of SR as the opposite - it depends on the generic thrift but then also connects it to internal implementation details such as service discovery, logging, and configuration.