56quarters / cadence

An extensible Statsd client for Rust
https://docs.rs/cadence/
Apache License 2.0
84 stars 27 forks source link

Investigate what it would take to use Futures (or Tokio) idiomatically #75

Closed 56quarters closed 3 years ago

56quarters commented 6 years ago

Lots of Rust servers / apps are being created based on Tokio which uses Future (and the associated lib) as one its key abstractions. What would it take to expose something like the MetricClient API in a way that either uses Futures or the Tokio event loop?

parasyte commented 4 years ago

While I would generally prefer a natively async crate for metrics, I don't necessarily think that it's a showstopper.

The synchronous client can be run in its own thread, consuming from an IPC channel synchronously. The async side would then send metrics over the IPC channel asynchronously. Picking a good IPC implementation is the only challenge; for apps built on Tokio 0.1, there is parity-tokio-ipc, and for Tokio 0.2 you can go straight to tokio-uds (no Windows support).


The better option is of course async all the way down. And best without tying the crate directly to any specific runtime (like Tokio). Going over the code, it doesn't look like this would be too much trouble.

First all of the client traits need async versions. E.g.:

pub trait AsyncGauged: Gauged {
    /// Record a gauge value with the given key
    async fn gauge(&self, key: &str, value: u64) -> MetricResult<Gauge> {
        self.gauge_with_tags(key, value)
            .try_send_async()
            .await?
    }
}

/// Async trait that encompasses all other async traits for sending metrics.
pub trait AsyncMetricClient: AsyncCounted + AsyncTimed + AsyncGauged + AsyncMetered + AsyncHistogrammed + AsyncSetted {}

And now MetricBuilder needs a new async method:

impl<'m, 'c, T> MetricBuilder<'m, 'c, T>
where
    T: Metric + From<String>,
{
    // ... snip ...

    /// Asynchronously send a metric using the client that created this builder.
    pub async fn try_send_async() -> MetricResult<T> {
        match self.repr {
            BuilderRepr::Error(err, _) => Err(err),
            BuilderRepr::Success(ref formatter, client) => {
                let metric: T = formatter.build();
                client.send_metric_async(&metric).await?;
                Ok(metric)
            }
        }
    }

    // ... same for async version of the `send` method ...
}

We extend MetricBackend with the new send_metric_async method, and implement it on a new async client:

pub trait MetricBackend {
    // ... snip ...

    /// Asynchronously send a full formed `Metric` implementation via the underlying `MetricSink`
    async fn send_metric_async<M>(&self, metric: &M) -> MetricResult<()>
    where
        M: Metric;
}

impl MetricBackend for AsyncStatsdClient {
    // ... snip ...

    async fn send_metric_async<M>(&self, metric: &M) -> MetricResult<()>
    where
        M: Metric,
    {
        let metric_string = metric.as_metric_str();
        self.sink.emit_async(metric_string).await?;
        Ok(())
    }
}

Finally, we need an async version of the MetricsSink trait, and new async implementers like AsyncBufferedUdpMetricSink and AsyncQueueingMetricSink:

pub trait AsyncMetricSink {
    // ... snip ...

    /// Asynchronously send the Statsd metric using this sink and return the number of bytes
    /// written or an I/O error.
    async fn emit_async(&self, metric: &str) -> io::Result<usize>;
}

And that should do it! Client usage would look like this:

use cadence::async_prelude::*;
use cadence::{AsyncBufferedUdpMetricSink, AsyncQueuingMetricSink, AsyncStatsdClient, DEFAULT_PORT};
use tokio::net::UdpSocket

#[tokio::main]
fn main() {
    let sock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
    let buffered = AsyncBufferedUdpMetricSink::from(("localhost", DEFAULT_PORT), sock).unwrap();
    let queued = AsyncQueuingMetricSink::from(buffered);
    let metrics = AsyncStatsdClient::from_sink("example.prefix", queued);

    metrics.count("example.counter", 1).await.unwrap();
    metrics.gauge("example.gauge", 5).await.unwrap();
    metrics.time("example.timer", 32).await.unwrap();
    metrics.histogram("example.histogram", 22).await.unwrap();
    metrics.meter("example.meter", 8).await.unwrap();
}

I'm not in love with the naming. There is also a lot of bifurcation in this proposal. But that's kind of a known issue in general. It may be worth splitting the async-specific parts into their own crate, but that could be difficult with the existing method visibility. Another popular option in these cases is a feature flag to switch the API from sync to async (without renaming traits and methods).

If this sounds good to you, I can possibly start on this work next week or early January. I will probably need this sooner than later, and I wouldn't be comfortable putting the IPC workaround into production. :)

56quarters commented 4 years ago

Thanks for the ideas!

I agree that a library that was fully async would be ideal. However, given the way Cadence is currently written (heavy use of traits for everything) it may be tough to make it work without adding dependencies or ending up with cumbersome type signatures: https://rust-lang.github.io/async-book/07_workarounds/06_async_in_traits.html

I appreciate the offer of help but I'm not ready yet to commit to an approach or API for this. I'm going to see if I can come up with a reasonable async client separate from Cadence and then see what it would take to share some of the core types/code/etc.

parasyte commented 4 years ago

I forgot about unstable async trait methods. :|

Would you be willing to share a link here for anything you end up experimenting on? I would much rather collaborate on something than totally NIH roll my own.

56quarters commented 4 years ago

Absolutely, when I start experimenting with anything, I'll link it here.

56quarters commented 4 years ago

Notes:

I've been playing around with some async code, writing a DNS-over-HTTPS server server. It uses the Hyper HTTP framework and TrustDNS client for the most part. Based on my experience with this, I've written down some thoughts about what an async version of Cadence might look like.

Traits

One of the features of Cadence that I like and am proud of is the ability to swap out various MetricSink implementations. If none of the existing ones fit your use case, it's easy enough to create your own. Any async version of the client would have to support something like this.

async-trait is quite pleasant to use. Adding a dependency might be a reasonable tradeoff to keep the type signatures simple compared to what they'd need to be if we did the equivalent of what the syntactic sugar is doing (Pin<Box<dyn Future<T>> vs async T).

Architecture

TrustDNS uses a model where a "client" is more like a handle that uses a MPSC channel to send messages to a task running in some executor to actually handle all network interaction. This seems like the right model for Cadence. In fact, I think it may be the only model that makes sense. Since a future is not run until it is .awaited, we'd be forcing every uses of Cadence to .await every metric call to make sure it was run (which is a bit annoying for sending metrics, something that's not usually the point of whatever an application is doing).

Since we aren't returning any results as a result of network operations: it might be super easy to actually to start using futures. Instead of a the QueuingMetricSink being just another sink implementation, maybe we turn it into the "background future" (the way TrustDNS works). We could get rid of all our bespoke kinda-thread pool stuff. I'm not sure if matters that we do metric formatting in the calling thread. It certainly makes the code easier to test. I'll need to benchmark this to confirm it's not a problem.

In any case, I think since the client isn't generating a meaningful result based on the async network operation, it makes sense to use the client/backend split model communicating over a channel. TBD what parts of sending a metric go into which part of the flow.

Code Layout

I'm not sure what the code layout will look like post-async functionality. I suppose it will depend a lot on how we choose to implement the sync and async clients. The goal here will be to make sure that existing users of the library that don't need an async-backed client aren't penalized too badly (performance wise, extra dependencies, breaking changes, etc.).

Timeline

I'm probably not going to start this work immediately and once I do I imagine there will be a bit of experimentation to get it right. In the mean time, Cadence already does all blocking network operations in a separate thread (via the QueuingMetricSink so I don't think it's too much of a hardship for people.

parasyte commented 4 years ago

I would not recommend using the existing QueueingMetricSink in an async context. It depends on crossbeam-channel which has a spinlock in the code path.

parasyte commented 4 years ago

A simple but kludgy workaround for now is spawning the metrics calls in a blocking thread pool. In Tokio 0.2 that's tokio::task::spawn_blocking. The biggest problem with this is that with metrics-heavy workloads, these tasks may add unacceptable latency to other blocking tasks, since they all share the same thread pool. Just something to be aware of. Another problem is that the blocking thread pool queues tasks unbounded, without back pressure. So metrics-heavy workloads can exhaust system memory.

Apparently async-std is magic and doesn't require any special handling for blocking functions.

tustvold commented 4 years ago

@parasyte I presume the spinlock you are referring to is the one here? If so my, potentially incorrect, understanding of the logic is it should only spin whilst waiting for another thread to finish patching up the list pointers after allocating the next block of memory, which should be a very short period of time and therefore not a concern.

I'd imagine it is likely to "block" to the same degree as enqueueing the task into a threadpool, as this likely relies on a similar concurrent data structure.

I think the lack of back pressure has also recently been fixed by #97.

parasyte commented 4 years ago

@tustvold The problem is that thread priorities and the OS scheduler aren't always fair. (cf. Solved: Thread::unpark is extremely slow on windows)

Even if the spinlock is only blocking the thread for a few milliseconds, the reality is that this blocks thousands of asynchronous tasks that are pinned to that thread. This kind of blocking aggregates over time, since any task can end up yielding the thread to the OS while waiting on a spinlock.

I should say that this may or may not be acceptable, depending on your use case.

edit: Here's some more reading material if you are interested in the subject:

tustvold commented 4 years ago

@parasyte I suspect that most people looking to use this library in an async context will be using a threadpool implementation like tokio which will keep the thread count close to or below the number of physical CPU cores. In such a scenario it would be hard to get the necessary contention to cause the latency explosion that spinlocks trade off for maximum throughput. Ultimately the issue with spinlocks is that they are a non-OS level concurrency primitive which causes issues if using an OS-level preemptive task scheduler which isn't aware of them, users of tokio or some other threadpooling implementation aren't relying on such a scheduler to the same extent.

56quarters commented 3 years ago

Pretty new still but this seems promising: https://github.com/ecliptical/tokio-cadence

Uses Cadence as-is but with a queuing, buffering, async sink via Tokio.

parasyte commented 3 years ago

Oh, of course. That's my colleague. We've been using cadence in-house for a number of years, now. :tada:

56quarters commented 3 years ago

Ha! Given that, I'm OK closing this issue and pointing people at that crate if they have a need for Cadence in an async context