davidMcneil / rants

An async NATS client library for the Rust programming language.
Apache License 2.0
81 stars 11 forks source link

Broken pipe when publish message for awhile #15

Closed DAOCUONG closed 4 years ago

DAOCUONG commented 4 years ago

Client has panic broken pipe error when we publish message for a while, It more easy to reproduce when turn on : client.connect_mut().await.echo(true); Err:Io(Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })

davidMcneil commented 4 years ago

Thanks for opening the issue!

Could you post the stack trace (set the environment variable RUST_BACKTRACE=1)?

Do you have any other details about the conditions when the panic occurs? Sample code that reliably produces the panic would be very helpful.

DAOCUONG commented 4 years ago

Code reproduce

#[tokio::main]
async fn main() {
    // A NATS server must be running on `127.0.0.1:4222`
    let address ="nats://127.0.0.1:4222".parse().unwrap();
    let client = Client::new(vec![address]);

    // Configure the client to receive messages even if it sent the message

    client.connect_mut().await.echo(true);

    // Connect to the server
    client.connect().await;

    // Create a new subject called "test"
    let subject = "test".parse().unwrap();

    // Subscribe to the "test" subject
   // let (_, mut subscription) = client.subscribe(&subject, 1024).await.unwrap();

    // Publish a message to the "test" subject
    loop {
        client
        .publish(&subject, b"command")
        .await
        .unwrap();
        //thread::sleep(Duration::from_secs(1));
        println!("Send message ");

    }
    // Read a message from the subscription
   // let message = subscription.next().await.unwrap();
    //let message = String::from_utf8(message.into_payload()).unwrap();
   // println!("Received '{}'", message);

    // Disconnect from the server
    client.disconnect().await;
 }
DAOCUONG commented 4 years ago

Stack trace:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })', src/libcore/result.rs:1165:5
stack backtrace:
   0: backtrace::backtrace::libunwind::trace
             at /Users/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.37/src/backtrace/libunwind.rs:88
   1: backtrace::backtrace::trace_unsynchronized
             at /Users/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.37/src/backtrace/mod.rs:66
   2: std::sys_common::backtrace::_print_fmt
             at src/libstd/sys_common/backtrace.rs:76
   3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
             at src/libstd/sys_common/backtrace.rs:60
   4: core::fmt::write
             at src/libcore/fmt/mod.rs:1030
   5: std::io::Write::write_fmt
             at src/libstd/io/mod.rs:1412
   6: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:64
   7: std::sys_common::backtrace::print
             at src/libstd/sys_common/backtrace.rs:49
   8: std::panicking::default_hook::{{closure}}
             at src/libstd/panicking.rs:196
   9: std::panicking::default_hook
             at src/libstd/panicking.rs:210
  10: std::panicking::rust_panic_with_hook
             at src/libstd/panicking.rs:473
  11: std::panicking::continue_panic_fmt
             at src/libstd/panicking.rs:380
  12: rust_begin_unwind
             at src/libstd/panicking.rs:307
  13: core::panicking::panic_fmt
             at src/libcore/panicking.rs:85
  14: core::result::unwrap_failed
             at src/libcore/result.rs:1165
  15: core::result::Result<T,E>::unwrap
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libcore/result.rs:933
  16: Nats::main::{{closure}}
             at src/main.rs:26
  17: <std::future::GenFuture<T> as core::future::future::Future>::poll::{{closure}}
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/future.rs:43
  18: std::future::set_task_context
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/future.rs:79
  19: <std::future::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/future.rs:43
  20: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/basic_scheduler.rs:139
  21: tokio::runtime::global::with_state::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/global.rs:100
  22: std::thread::local::LocalKey<T>::try_with
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/local.rs:262
  23: std::thread::local::LocalKey<T>::with
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/local.rs:239
  24: tokio::runtime::global::with_state
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/global.rs:83
  25: tokio::runtime::global::with_basic_scheduler
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/global.rs:62
  26: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/basic_scheduler.rs:122
  27: tokio::runtime::Runtime::block_on::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/mod.rs:419
  28: tokio::runtime::global::with_state::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/global.rs:100
  29: std::thread::local::LocalKey<T>::try_with
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/local.rs:262
  30: std::thread::local::LocalKey<T>::with
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/local.rs:239
  31: tokio::runtime::global::with_state
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/global.rs:83
  32: tokio::runtime::global::with_basic_scheduler
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/global.rs:62
  33: tokio::runtime::basic_scheduler::Spawner::enter
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/basic_scheduler.rs:174
  34: tokio::runtime::spawner::Spawner::enter
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/spawner.rs:30
  35: tokio::runtime::handle::Handle::enter::{{closure}}::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/handle.rs:36
  36: tokio::time::clock::Clock::enter
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/time/clock.rs:30
  37: tokio::runtime::time::variant::with_default
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/time.rs:43
  38: tokio::runtime::handle::Handle::enter::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/handle.rs:36
  39: tokio::runtime::blocking::pool::Spawner::enter::{{closure}}
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/blocking/pool.rs:191
  40: std::thread::local::LocalKey<T>::try_with
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/local.rs:262
  41: std::thread::local::LocalKey<T>::with
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/local.rs:239
  42: tokio::runtime::blocking::pool::Spawner::enter
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/blocking/pool.rs:176
  43: tokio::runtime::handle::Handle::enter
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/handle.rs:33
  44: tokio::runtime::Runtime::block_on
             at /Users/velimir/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.4/src/runtime/mod.rs:416
  45: Nats::main
             at src/main.rs:5
  46: std::rt::lang_start::{{closure}}
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/rt.rs:64
  47: std::rt::lang_start_internal::{{closure}}
             at src/libstd/rt.rs:49
  48: std::panicking::try::do_call
             at src/libstd/panicking.rs:292
  49: __rust_maybe_catch_panic
             at src/libpanic_unwind/lib.rs:80
  50: std::panicking::try
             at src/libstd/panicking.rs:271
  51: std::panic::catch_unwind
             at src/libstd/panic.rs:394
  52: std::rt::lang_start_internal
             at src/libstd/rt.rs:48
  53: std::rt::lang_start
             at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/rt.rs:64
  54: Nats::main
davidMcneil commented 4 years ago

@DAOCUONG thank you for posting the example code and the backtrace. There are several things going on here:

use env_logger;
use rants::Client;
use tokio::task;

#[tokio::main]
async fn main() {
    env_logger::init();
    let address = "nats://127.0.0.1:4222".parse().unwrap();
    let client = Client::new(vec![address]);
    client.connect().await;
    let subject = "test".parse().unwrap();
    let mut count = 0;
    loop {
        if let Err(e) = client.publish(&subject, b"command").await {
            println!("error {:?}", e);
        }
        if count >= 1000 {
            count = 0;
            task::yield_now().await;
        }
        count += 1;
    }
}

While this fixes the problem, I am not sure why receiving messages is being starved. I need to investigate further to determine if it is a problem in the rants library code or possibly the runtime.

DAOCUONG commented 4 years ago

Thank you for your information