Closed sehz closed 3 years ago
This might end up taking a little more work than originally thought. There isn't a drop-in replacement for fluvio_future::spawn()
but for processes. I currently am planning to use nix::fork()
. This might mean we have to know the number of producer and consumer and fork to the number of proccesses prior to the test start.
I'm experiencing a lot of deadlock as part of the swap from threads to processes.
Specifically, we deadlock when we attempt to instantiate a producer or a consumer because we use the TestDriver api in order to keep track of the number of producers/consumers in a test, as well as to account for latency and throughput. This is a design flaw.
Tests need an abstraction for reporting statistics back to the main test driver that don't include locking the driver itself.
I haven't confirmed that the client connection that I create before entering the test persists after fork()
ing too. I hope it does since the tests are designed around the invariant of having an existing connection to the cluster under test.
I need something like a channel between the producer/consumers and the runner. I was looking at https://github.com/servo/ipc-channel, but they have a warning for no support for multiple clients.
I'd rather avoid some kind of server/client re-architecture at this moment but I'm considering using HTTP for cross-process communication.
After discussion, the current goal is to practice some dogfooding and use the Fluvio cluster under test as the event bus for process communication.
For the current tests, we'll create specific topics for the test driver to use (and probably not report in results). We'll need to instantiate another producer (per process, again, probably not reflected in results) that is only used for reporting test measurements.
We possibly may need to set up a convention to let the test driver know that all producers and consumers have terminated. But I'll worry about that later.
Discovered a runtime problem related to how we're using async in fluvio-test. In fluvio-test's main()
, we wrap all of the code within a task::block_on
(via fluvio_future
, which uses async-std behind the scenes).
As a result, any child process ends up blocking on .await
because when we call fork()
later, our instantiated Builder
appears to not behave as I expected.
I've created a minimal reproducible scenario that shows how we can successfully fork processes. We basically isolate our async code as late as possible, after calling fork()
.
Cargo.toml
[package]
name = "fluvio-processes"
version = "0.1.0"
edition = "2018"
[dependencies]
fluvio = "0.9.6"
fluvio-future = { version = "0.3.0", features = ["task", "timer", "subscriber", "fixture"] }
nix = "0.22"
fork = "0.1"
async-std = { version = "1", features = ["attributes"] }
tracing = "0.1"
Example workflow w/ parent and child processes connecting to Fluvio and producing data against the same topic
use std::process::exit;
use async_std::task;
use fluvio::{Fluvio, RecordKey};
use fluvio_future::task::run_block_on;
use fork::{fork, Fork};
use nix::sys::wait::waitpid;
use nix::unistd::Pid;
async fn fluvio_process_test(name: &str) {
println!("[process task] {:?}", task::current().name());
println!("[{}] About to create a Fluvio connection", name);
let fluvio = Fluvio::connect().await.expect("Couldn't connect");
println!("[{}] Created a Fluvio client", name);
let producer = fluvio
.topic_producer("longevity")
.await
.expect("Couldn't producer");
println!("[{}] Created a Fluvio producer", name);
producer
.send(RecordKey::NULL, format!("Hello from the {}", name))
.await
.expect("Producer send failed");
println!("[{}] Sent a record", name);
}
fn main() {
fluvio_future::subscriber::init_logger();
let producer_pid = match fork() {
Ok(Fork::Parent(child_pid)) => {
println!("[parent] Waiting on child pid {}", child_pid);
run_block_on(async {
task::Builder::new()
.name("parent".into())
.spawn(fluvio_process_test("parent"))
.unwrap()
.await;
});
child_pid
}
Ok(Fork::Child) => {
println!("[child] New process");
run_block_on(async {
task::Builder::new()
.name("child".into())
.spawn(fluvio_process_test("child"))
.unwrap()
.await
});
exit(0);
}
Err(_) => panic!("Fork failed"),
};
let pid = Pid::from_raw(producer_pid);
match waitpid(pid, None) {
Ok(status) => {
println!("[main] Producer Child exited with status {:?}", status);
}
Err(err) => panic!("[main] waitpid() failed: {}", err),
}
}
Stdout
[parent] Waiting on child pid 829171
[child] New process
[process task] Some("parent")
[parent] About to create a Fluvio connection
[process task] Some("child")
[child] About to create a Fluvio connection
[parent] Created a Fluvio client
[child] Created a Fluvio client
[parent] Created a Fluvio producer
[child] Created a Fluvio producer
[parent] Sent a record
[child] Sent a record
[main] Producer Child exited with status Exited(Pid(829171), 0)
I'm going to make an attempt to rearrange the code to copy the example code structure. But I have a feeling that it may be less painful to create separate producer
and consumer
into their own test binaries in the long run.
This would separate the test runner + tests from the producer and consumer behavior. If I'm going this route, I'd probably use the fluvio
CLI as a phase 0.
Figured out the last piece of this puzzle. Turns out that if you call run_block_on
anytime in the parent process before forking, we see this behavior.
Negative example based on https://github.com/infinyon/fluvio/issues/627#issuecomment-930498566:
fn main() {
fluvio_future::subscriber::init_logger();
// This task block makes child process fail
task::block_on(async {
task::Builder::new()
.name("test".into())
.blocking(async {println!("test")})
});
let producer_pid = match fork() {
Ok(Fork::Parent(child_pid)) => {
println!("[parent] Waiting on child pid {}", child_pid);
run_block_on(async {
task::Builder::new()
.name("parent".into())
.spawn(fluvio_process_test("parent"))
.unwrap()
.await;
});
child_pid
}
Ok(Fork::Child) => {
println!("[child] New process");
run_block_on(async {
task::Builder::new()
.name("child".into())
.spawn(fluvio_process_test("child"))
.unwrap()
.await
});
exit(0);
}
Err(_) => panic!("Fork failed"),
};
let pid = Pid::from_raw(producer_pid);
match waitpid(pid, None) {
Ok(status) => {
println!("[main] Producer Child exited with status {:?}", status);
}
Err(err) => panic!("[main] waitpid() failed: {}", err),
}
}
In the smoke test, consumers and producers created using separate connection. If they are created from same Fluvio, it fails.