crypto-crawler / crypto-crawler-rs

A rock-solid cryptocurrency crawler library.
Apache License 2.0
217 stars 71 forks source link

crypto_msg_parser::parse_trade() : Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context. #58

Open panicfarm opened 1 year ago

panicfarm commented 1 year ago

I am using the following code (collected from your examples):

 1 use crypto_crawler::{crawl_trade, MarketType};
  3 #[tokio::main(flavor = "multi_thread")]
  4 async fn main() {
  5     let (tx, rx) = std::sync::mpsc::channel();
  7     tokio::task::spawn(async move {
  8         crawl_trade(
  9             "kucoin",
 10             MarketType::LinearSwap,
 11             Some(&["SOLUSDTM".to_string()]),
 12             /*
 13             MarketType::Spot,
 14             Some(&["SOL-USDT".to_string()]),
 15             */
 16             tx,
 17         )
 18         .await
 19     });
 20     for msg in rx {
 21         //
 22         let trades =
 23             crypto_msg_parser::parse_trade(&, msg.market_type, &msg.json).unwrap();
 24         let trade = &trades[0];
 25         println!("{:#?}", trade);
 26     }
 27 }

Only for MarketType::LinearSwap (but not for MarketType::Spot because never gets called for Spot) I get the following panic with the debug build (this never happens with a release build, i think it's something debug-specific? with blocking http_get("") reqwest in crypto-contract-value-1.7.7/src/exchanges/ ).

alecm@ark2:~/qkt$ ./target/debug/qk
thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /home/alecm/.cargo/registry/src/
stack backtrace:
   0: std::panicking::begin_panic
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/std/src/
   1: tokio::runtime::blocking::shutdown::Receiver::wait
             at /home/alecm/.cargo/registry/src/
   2: tokio::runtime::blocking::pool::BlockingPool::shutdown
             at /home/alecm/.cargo/registry/src/
   3: <tokio::runtime::blocking::pool::BlockingPool as core::ops::drop::Drop>::drop
             at /home/alecm/.cargo/registry/src/
   4: core::ptr::drop_in_place<tokio::runtime::blocking::pool::BlockingPool>
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/ptr/
   5: core::ptr::drop_in_place<tokio::runtime::runtime::Runtime>
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/ptr/
   6: reqwest::blocking::wait::enter
             at /home/alecm/.cargo/registry/src/
   7: reqwest::blocking::wait::timeout
             at /home/alecm/.cargo/registry/src/
   8: reqwest::blocking::client::ClientHandle::new
             at /home/alecm/.cargo/registry/src/
   9: reqwest::blocking::client::ClientBuilder::build
             at /home/alecm/.cargo/registry/src/
  10: crypto_contract_value::exchanges::utils::http_get
             at /home/alecm/.cargo/registry/src/
  11: crypto_contract_value::exchanges::kucoin::fetch_linear_multipliers
             at /home/alecm/.cargo/registry/src/
  12: crypto_contract_value::exchanges::kucoin::LINEAR_CONTRACT_VALUES::{{closure}}
             at /home/alecm/.cargo/registry/src/
  13: core::ops::function::FnOnce::call_once
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/ops/
  14: core::ops::function::FnOnce::call_once
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/ops/
  15: once_cell::sync::Lazy<T,F>::force::{{closure}}
             at /home/alecm/.cargo/registry/src/
  16: once_cell::sync::OnceCell<T>::get_or_init::{{closure}}
             at /home/alecm/.cargo/registry/src/
  17: once_cell::imp::OnceCell<T>::initialize::{{closure}}
             at /home/alecm/.cargo/registry/src/
  18: core::ops::function::impls::<impl core::ops::function::FnMut<A> for &mut F>::call_mut
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/ops/
  19: once_cell::imp::initialize_or_wait
             at /home/alecm/.cargo/registry/src/
  20: once_cell::imp::OnceCell<T>::initialize
             at /home/alecm/.cargo/registry/src/
  21: once_cell::sync::OnceCell<T>::get_or_try_init
             at /home/alecm/.cargo/registry/src/
  22: once_cell::sync::OnceCell<T>::get_or_init
             at /home/alecm/.cargo/registry/src/
  23: once_cell::sync::Lazy<T,F>::force
             at /home/alecm/.cargo/registry/src/
  24: <once_cell::sync::Lazy<T,F> as core::ops::deref::Deref>::deref
             at /home/alecm/.cargo/registry/src/
  25: crypto_contract_value::exchanges::kucoin::get_contract_value
             at /home/alecm/.cargo/registry/src/
  26: crypto_contract_value::get_contract_value
             at /home/alecm/.cargo/registry/src/
  27: crypto_msg_parser::exchanges::utils::calc_quantity_and_volume
             at /home/alecm/.cargo/git/checkouts/crypto-msg-parser-aaa16a2660930560/8bc9eb7/crypto-msg-parser/src/exchanges/
  28: crypto_msg_parser::exchanges::kucoin::kucoin_swap::parse_trade
             at /home/alecm/.cargo/git/checkouts/crypto-msg-parser-aaa16a2660930560/8bc9eb7/crypto-msg-parser/src/exchanges/kucoin/
  29: crypto_msg_parser::exchanges::kucoin::parse_trade
             at /home/alecm/.cargo/git/checkouts/crypto-msg-parser-aaa16a2660930560/8bc9eb7/crypto-msg-parser/src/exchanges/kucoin/
  30: crypto_msg_parser::parse_trade
             at /home/alecm/.cargo/git/checkouts/crypto-msg-parser-aaa16a2660930560/8bc9eb7/crypto-msg-parser/src/
  31: qk::main::{{closure}}
             at ./src/
  32: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/future/
  33: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
             at /home/alecm/.cargo/registry/src/
  34: tokio::runtime::coop::with_budget
             at /home/alecm/.cargo/registry/src/
  35: tokio::runtime::coop::budget
             at /home/alecm/.cargo/registry/src/
  36: tokio::runtime::park::CachedParkThread::block_on
             at /home/alecm/.cargo/registry/src/
  37: tokio::runtime::context::BlockingRegionGuard::block_on
             at /home/alecm/.cargo/registry/src/
  38: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
             at /home/alecm/.cargo/registry/src/
  39: tokio::runtime::runtime::Runtime::block_on
             at /home/alecm/.cargo/registry/src/
  40: qk::main
             at ./src/
  41: core::ops::function::FnOnce::call_once
             at /rustc/8b0c05d9ad7121cdb97600f261bcd5f04c8db20d/library/core/src/ops/
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

PS. With the release build, i let it run for several days, but the websocket stopped getting new messages after 2 days. I rebuilt as a debug and ran into the panic above. Should I file a separate Issue on the stuck websocket (unsure how to debug that, strace showed waiting for some FUTEX)?

soulmachine commented 1 year ago

Can you remove line numbers from your code snipet above? I'd like to run you code on my machine

panicfarm commented 1 year ago

Can you remove line numbers from your code snipet above? I'd like to run you code on my machine

Here you go

use crypto_crawler::{crawl_trade, MarketType};

#[tokio::main(flavor = "multi_thread")]
async fn main() {
    let (tx, rx) = std::sync::mpsc::channel();

    tokio::task::spawn(async move {
    for msg in rx {
        let trades =
            crypto_msg_parser::parse_trade(&, msg.market_type, &msg.json).unwrap();
        let trade = &trades[0];
        println!("{:#?}", trade);


name = "qk"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at

crypto-crawler = { git = "", branch = "main" }
crypto-msg-parser = { git = "", branch = "main" }
crypto-message = { git = "", branch = "main"}

tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
foonsun commented 1 year ago

Can you remove line numbers from your code snipet above? I'd like to run you code on my machine

Here you go

use crypto_crawler::{crawl_trade, MarketType};

#[tokio::main(flavor = "multi_thread")]
async fn main() {
    let (tx, rx) = std::sync::mpsc::channel();

    tokio::task::spawn(async move {
    for msg in rx {
        let trades =
            crypto_msg_parser::parse_trade(&, msg.market_type, &msg.json).unwrap();
        let trade = &trades[0];
        println!("{:#?}", trade);


name = "qk"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at

crypto-crawler = { git = "", branch = "main" }
crypto-msg-parser = { git = "", branch = "main" }
crypto-message = { git = "", branch = "main"}

tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

I have the same issue.any resolutions? @panicfarm

soulmachine commented 1 year ago

Looking into this issue, will update

panicfarm commented 1 year ago

I have the same issue.any resolutions? @panicfarm

@soulmachine I believe the issue is here: fn fetch_linear_multipliers() is blocking but your example that I used is async (because your websocket is async), so this function (and thus crypto_msg_parser::parse_trade()) cannot be called from an asynchronous context.

The function is blocking because inside it contains blocking reqwest:

You could make the entire crypto_msg_parser::parse_trade() async, although I think it's prefererrable to "instantly" parse the websocket messages, leaving this function sync, so maybe this blocking fetch_linear_multipliers reqwest can be avoided or asynchronously done before the websocket connection is established.

foonsun commented 1 year ago

I have the same issue.any resolutions? @panicfarm

@soulmachine I believe the issue is here: fn fetch_linear_multipliers() is blocking but your example that I used is async (because your websocket is async), so this function (and thus crypto_msg_parser::parse_trade()) cannot be called from an asynchronous context.

The function is blocking because inside it contains blocking reqwest:

You could make the entire crypto_msg_parser::parse_trade() async, although I think it's prefererrable to "instantly" parse the websocket messages, leaving this function sync, so maybe this blocking fetch_linear_multipliers reqwest can be avoided or asynchronously done before the websocket connection is established.

I think whether fetch_linear_multipliers is blocking or async is not important because it's only called once when called. static LINEAR_CONTRACT_VALUES: Lazy<HashMap<String, f64>> = Lazy::new(|| only request once. I prefer the async resolution because the app may use the async functions.

foonsun commented 1 year ago

any update? @soulmachine

soulmachine commented 1 year ago

Not yet, still working on it