Closed DaneSlattery closed 2 months ago
If you are running on the xtensa esp32 chip, then you might be hitting this issue: https://github.com/smol-rs/async-lock/issues/84
Until this is fixed, the one workaround I have found to be effective is to use an older version of async-io
(and thus async-lock
) as follows:
async-io = "=2.0.0"
If that doesn't help, you might still be having a stack overflow issue.
Unrelated, but maybe good to know: with Rust async on MCUs, you really have to avoid stuff like this:
let mut buff = [0u8; 8192];
Due to inefficiencies as to how Rust futures are laid out, code like the above often ends up consuming 2x what you think it consumes, thus blowing up your stack, as most (all?) of the futures in your example app do live on stack.
Prefer pre-allocating the buffer in the non-async code and then just passing a &mut
ref to the buffer to the async code. Thus you end up with 4-bytes pointer taking 8 bytes, instead of 8K taking 16K.
One of the more efficient ways to allocate the buffer (if you allocate it from heap, that is) is to do
let mut buffer: MaybeUninit<[0; 8192]> = Box::new_uninit();
let buffer = unsafe { buffer.assume_init_mut() };
Great stuff. I will give async-io = "=2.0.0"
a try, and report back here. I would hope with a stack size of 60000 bytes allocated to the pthreads that I'm not still overflowing.
Thank you for the hint, I will apply it in this case to reduce the stack requirements. I try to keep as much on the stack as possible in case I need to jump to no-std
without an allocator,
no_std
(at least embassy no_std) also likes a lot static contexts. Check the static_cell
as an alternative to in-stack allows.
Hi Ivan
I did eventually get this working, but only with a reduced buffer. I will add it here for reference on Monday.
Just out of curiousity, do you know why the esp-idf web socket implementation is so poorly supported? For example, the need to read the socket into an empty buffer only once is a really weird choice, and it clearly breaks if you send multiple messages back and forth on the web socket, expecting a "second packet" before a certain timeout.
Your version worked like a charm (after much pain and suffering)
Hi Ivan
I did eventually get this working, but only with a reduced buffer. I will add it here for reference on Monday.
Yep, that would be great, as I wonder why you needed to reduce the buffer.
Just out of curiousity, do you know why the esp-idf web socket implementation is so poorly supported? For example, the need to read the socket into an empty buffer only once is a really weird choice, and it clearly breaks if you send multiple messages back and forth on the web socket, expecting a "second packet" before a certain timeout.
Not sure what you mean here. Do you mean that in order to even understand how big the payload of the incoming frame is, you need to call whatever-the-method-for-reading-a-frame-was-called with a null
buffer first?
Your version worked like a charm (after much pain and suffering)
Yeah, just keep in mind that I pushed to master
a crucial fix that might be affecting you ~ one hour ago.
The thing is, when upgrading from a regular HTTP connection (client or server) to a WS connection, the current code was reading not just the upgrade-response headers, but also a bit of what follows after. The problem is, if your server is pushing a WS frame right after sending the "OK, HTTP connection is upgraded" response to the client, the client might erroneously read a bit or all of the frame after reading the headers.
I was chasing this for the last two days; now is fixed and won't require code changes, but processing response headers might be a bit slower for now.
Hi
Here is my full example code for anyone that needs this in the future
#![feature(generic_arg_infer)]
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::thread::Scope;
use channel_bridge::asynch::ws::WsError;
use edge_executor::LocalExecutor;
use edge_http::io::server::{Connection, DefaultServer, Handler, TaskHandler};
use edge_http::ws::MAX_BASE64_KEY_RESPONSE_LEN;
use edge_http::{io, Method, DEFAULT_MAX_HEADERS_COUNT};
use edge_std_nal_async::StdTcpConnection;
use edge_ws::{FrameHeader, FrameType};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::signal::Signal;
use embassy_time::{Duration, Timer};
use embedded_io_async::{Read, Write};
use embedded_nal_async_xtra::TcpListen;
use esp_idf_svc::eventloop::EspSystemEventLoop;
use esp_idf_svc::hal::peripherals::Peripherals;
use esp_idf_svc::nvs::EspDefaultNvsPartition;
use esp_idf_svc::sys::EspError;
use esp_idf_svc::timer::EspTaskTimerService;
use esp_idf_svc::wifi::{AsyncWifi, Configuration};
use esp_idf_svc::wifi::{ClientConfiguration, EspWifi};
use log::info;
const SSID: &str = env!("WIFI_SSID");
const PASSWORD: &str = env!("WIFI_PASS");
// ws
pub static QUIT: Signal<CriticalSectionRawMutex, u32> = Signal::new();
const WS_MAX_CONNECTIONS: usize = 2;
pub const WS_MAX_FRAME_LEN: usize = 512;
// const
fn main() -> anyhow::Result<()> {
// It is necessary to call this function once. Otherwise some patches to the runtime
// implemented by esp-idf-sys might not link properly. See https://github.com/esp-rs/esp-idf-template/issues/71
esp_idf_svc::timer::embassy_time_driver::link();
esp_idf_svc::sys::link_patches();
// Bind the log crate to the ESP Logging facilities
esp_idf_svc::log::EspLogger::initialize_default();
esp_idf_svc::io::vfs::initialize_eventfd(5)?;
std::thread::scope(|scope: &Scope| run(scope))?;
Ok(())
}
fn run<'s>(scope: &'s Scope<'s, '_>) -> Result<(), EspError> {
let sys_loop = EspSystemEventLoop::take().unwrap();
let peripherals = Peripherals::take().unwrap();
let nvs = EspDefaultNvsPartition::take().unwrap();
let timer_service = EspTaskTimerService::new()?;
let task: std::thread::ScopedJoinHandle<Result<(), EspError>> = std::thread::Builder::new()
.stack_size(80000)
.spawn_scoped(scope, move || {
let executor = LocalExecutor::<8>::new();
let mut wifi = AsyncWifi::wrap(
EspWifi::new(peripherals.modem, sys_loop.clone(), Some(nvs))?,
sys_loop,
timer_service,
)?;
wifi.set_configuration(&Configuration::Client(ClientConfiguration {
ssid: SSID.try_into().unwrap(),
password: PASSWORD.try_into().unwrap(),
..Default::default()
}))?;
executor.spawn(connect_wifi(&mut wifi)).detach();
let mut httpd = httpd().unwrap();
let handler = http_handler().unwrap();
executor.spawn(run_ws(&mut httpd, handler)).detach();
esp_idf_svc::hal::task::block_on(executor.run(async {
loop {
Timer::after(Duration::from_millis(100)).await
}
}));
Ok(())
})
.unwrap();
let result: Result<(), _> = task.join().unwrap();
log::info!("Thread execution finised {result:?}");
Ok(())
}
pub async fn run_ws<H>(server: &mut DefaultServer, handler: H) -> Result<(), anyhow::Error>
where
H: for<'b> edge_http::io::server::Handler<'b, &'b mut edge_std_nal_async::StdTcpConnection, 64>,
{
let addr = "0.0.0.0:8881";
info!("Running HTTP ws server on {addr}");
let acceptor = edge_std_nal_async::Stack::new()
.listen(addr.parse().unwrap())
.await?;
server.run(acceptor, handler, None).await?;
Ok(())
}
struct WsHandler {
send_buff: UnsafeCell<MaybeUninit<[u8; MAX_BASE64_KEY_RESPONSE_LEN]>>,
recv_buff: UnsafeCell<MaybeUninit<[u8; 1024]>>,
}
impl WsHandler {
pub fn new() -> Self {
Self {
send_buff: UnsafeCell::new(MaybeUninit::uninit()),
recv_buff: UnsafeCell::new(MaybeUninit::uninit()),
}
}
async fn handle<'b, T, const N: usize>(
&self,
conn: &mut Connection<'b, T, N>,
) -> Result<(), HttpdError<T::Error>>
where
T: Read + Write,
{
let headers = conn.headers()?;
if !matches!(headers.method, Some(Method::Get)) {
conn.initiate_response(405, Some("Method Not Allowed"), &[])
.await?;
} else if !matches!(headers.path, Some("/")) {
conn.initiate_response(404, Some("Not Found"), &[]).await?;
} else if !conn.is_ws_upgrade_request()? {
conn.initiate_response(200, Some("OK"), &[("Content-Type", "text/plain")])
.await?;
conn.write_all(b"Initiate WS Upgrade request to switch this connection to WS")
.await?;
} else {
let send_buff: &mut [u8; _] =
unsafe { self.send_buff.get().as_mut().unwrap().assume_init_mut() };
let recv_buff = unsafe { self.recv_buff.get().as_mut().unwrap().assume_init_mut() };
self.handle_ws(send_buff, recv_buff, conn).await?;
}
Ok(())
}
async fn handle_ws<'b, T, const N: usize>(
&self,
send_buff: &mut [u8],
recv_buff: &mut [u8],
conn: &mut Connection<'b, T, N>,
) -> Result<(), HttpdError<T::Error>>
where
T: Read + Write,
{
let mut buf = send_buff[..MAX_BASE64_KEY_RESPONSE_LEN].try_into().unwrap();
conn.initiate_ws_upgrade_response(&mut buf).await?;
conn.complete().await?;
info!("Connection upgraded to WS, starting a simple WS echo server now");
let mut socket = conn.unbind()?;
let mut buf: [u8; 1024] = recv_buff[..1024].try_into().unwrap();
loop {
let mut header = FrameHeader::recv(&mut socket)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
let payload = header
.recv_payload(&mut socket, &mut buf)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
match header.frame_type {
FrameType::Text(_) => {
info!(
"Got {header}, with payload \"{}\"",
core::str::from_utf8(payload).unwrap()
);
}
FrameType::Binary(_) => {
info!("Got {header}, with payload {payload:?}");
}
FrameType::Close => {
info!("Got {header}, client closed the connection cleanly");
break;
}
_ => {
info!("Got {header}");
}
}
// Echo it back now
header.mask_key = None; // Servers never mask the payload
if matches!(header.frame_type, FrameType::Ping) {
header.frame_type = FrameType::Pong;
}
info!("Echoing back as {header}");
header
.send(&mut socket)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
header
.send_payload(&mut socket, payload)
.await
.map_err(|w| HttpdError::Ws(WsError::IoError(w)))?;
}
Ok(())
}
}
impl<'b, T, const N: usize> Handler<'b, T, N> for WsHandler
where
T: Read + Write,
T::Error: Send + Sync + std::error::Error + 'static,
{
type Error = anyhow::Error;
async fn handle(&self, conn: &mut Connection<'b, T, N>) -> Result<(), Self::Error> {
let res = WsHandler::handle(self, conn).await;
match res {
Ok(x) => (),
Err(x) => log::info!("Error in handling: {:?}", x),
}
Ok(())
}
}
async fn connect_wifi(wifi: &mut AsyncWifi<EspWifi<'static>>) -> anyhow::Result<()> {
let wifi_configuration: Configuration = Configuration::Client(ClientConfiguration {
ssid: SSID.try_into().unwrap(),
password: PASSWORD.try_into().unwrap(),
..Default::default()
});
wifi.set_configuration(&wifi_configuration)?;
wifi.start().await?;
info!("Wifi started");
wifi.connect().await?;
info!("Wifi connected");
wifi.wait_netif_up().await?;
info!("Wifi netif up");
Ok(())
}
#[derive(Debug)]
pub enum HttpdError<T> {
Http(io::Error<T>),
Ws(WsError<edge_ws::io::Error<T>>),
}
impl<T> From<io::Error<T>> for HttpdError<T> {
fn from(err: io::Error<T>) -> Self {
Self::Http(err)
}
}
impl<T> From<WsError<edge_ws::io::Error<T>>> for HttpdError<T> {
fn from(err: WsError<edge_ws::io::Error<T>>) -> Self {
Self::Ws(err)
}
}
fn httpd() -> Result<DefaultServer, EspError> {
Ok(DefaultServer::new())
}
fn http_handler() -> Result<WsHandler, EspError> {
Ok(WsHandler::new())
}
Yep, that would be great, as I wonder why you needed to reduce the buffer.
So I went your suggested route with a buffer passed into the async task from the outside the executor.
Not sure what you mean here. Do you mean that in order to even understand how big the payload of the incoming frame is, you need to call whatever-the-method-for-reading-a-frame-was-called with a
null
buffer first?
Yes, first that null buffer, but this has been my unfortunate workflow for this:
recv
with a null
buffer. Call this N
N
bytes from the web socket. This always contains the full payload that I sent over the web socket.This is completely outside of edge-net
though, but it is the reason I'm using this web socket implementation rather than the esp-idf
one.
Yeah, just keep in mind that I pushed to
master
a crucial fix that might be affecting you ~ one hour ago. The thing is, when upgrading from a regular HTTP connection (client or server) to a WS connection, the current code was reading not just the upgrade-response headers, but also a bit of what follows after. The problem is, if your server is pushing a WS frame right after sending the "OK, HTTP connection is upgraded" response to the client, the client might erroneously read a bit or all of the frame after reading the headers.I was chasing this for the last two days; now is fixed and won't require code changes, but processing response headers might be a bit slower for now.
Oh damn, I will see if this has impacted anything, but I'm sure it's all good. I hadn't seen that particular bug.
Hi
I am trying, and failing, to use the edge-net stack on top of esp-idf for an ESP32.
Between stack overflows, watch dog timeouts and mutex access violations, I am stuck trying to implement the basic example for https://crates.io/crates/edge-ws over WiFi. I have tried to use a lot of ideas from
ruwm
to get this to run, but I'm just not having luck. (For added context, I can get theesp-idf-svc
http server to run and process web sockets requests, but it is very finicky, often reporting and error oncould not get second packet
, even after having received the full message, so I thought I would go the same route asruwm
(i'm building a power meter)).The current error I am seeing pops up from ESP land, but I have no idea how to interpret it.
Here is my code: main.rs:
sdkconfig.defaults
Cargo.toml
.cargo/config.toml