nikis05 / fork_stream

Clone any Stream `S` where `<S as Stream>::Item: Clone`
MIT License
5 stars 1 forks source link

Items not garbage collected until `Forked<S>` is dropped #3

Closed clarkmcc closed 6 days ago

clarkmcc commented 1 week ago

Maybe I'm doing something wrong, but I have a production service that syncs data from one source (Stream) to a destination. I noticed memory usage steadily increase to about 1.3GB over the course of about 3 minutes, and then drop off all at once at the time that Forked<S> gets dropped. This got me to thinking that maybe for some reason the at least one T is being retained at all times. In the following example, I can reproduce the behavior where each stream clone (one for each of the 3 destinations) "syncs" and drops the T, and then finally at the end, all the Ts are dropped again, resulting 12 total drops when there should only be 9.

Can you help me understand if I'm doing something wrong here?

use fork_stream::StreamExt;
use futures::stream::iter;
use futures::StreamExt as FuturesStreamExt;

#[derive(Clone, Eq, PartialEq, Debug)]
struct Example {
    id: usize,
}

impl Drop for Example {
    fn drop(&mut self) {
        println!("Dropping Example {}", self.id);
    }
}

#[tokio::test]
async fn test_fork_stream() {
    let items = iter(vec![
        Example { id: 1 },
        Example { id: 2 },
        Example { id: 3 },
    ])
        .fork();

    let destinations = vec![10, 20, 30];

    for desto in destinations {
        let items = items.clone();
        items.for_each(|item| async move {
            println!("Syncing item {} to destination {}", item.id, desto);
        }).await;
    }
}
Dropping Example 1
Syncing item 1 to destination 10
Dropping Example 2
Syncing item 2 to destination 10
Dropping Example 3
Syncing item 3 to destination 10
Dropping Example 1
Syncing item 1 to destination 20
Dropping Example 2
Syncing item 2 to destination 20
Dropping Example 3
Syncing item 3 to destination 20
Dropping Example 1
Syncing item 1 to destination 30
Dropping Example 2
Syncing item 2 to destination 30
Dropping Example 3
Syncing item 3 to destination 30
Dropping Example 1
Dropping Example 2
Dropping Example 3
clarkmcc commented 1 week ago

Maybe part of my problem is also illustrated here? I am cloning the forked stream and pulling a value out of it, then I'm cloning the stream again, but after the first value has already been provided by the source stream. This leads me to believe that the forked stream is maintaining a copy of Example even after all known forks have already read it? This way all future forks can still read it?

#[tokio::test]
async fn test_fork_stream() {
    let items = iter(vec![
        Arc::new(Example { id: 1 }),
        Arc::new(Example { id: 2 }),
        Arc::new(Example { id: 3 }),
    ])
        .fork();

    let v1 = items.clone().next().await.unwrap();
    let v2 = items.clone().next().await.unwrap();
    assert_ne!(v1.id, v2.id);
}
thread 'test_fork_stream' panicked at data-syncer/src/lib.rs:50:5:
assertion `left != right` failed
  left: 1
 right: 1
clarkmcc commented 1 week ago

Okay, so I think this latest behavior is what a weak reference is for. I'm able to prevent streams from seeing items prior to an upgrade with this

#[derive(Clone, Eq, PartialEq, Debug)]
struct Example {
    id: usize,
}

impl Drop for Example {
    fn drop(&mut self) {
        println!("Dropping Example {}", self.id);
    }
}

#[tokio::test]
async fn test_fork_stream() {
    let items = iter(vec![
        Example { id: 1 },
        Example { id: 2 },
        Example { id: 3 },
    ]).fork();

    let s1 = items.clone().downgrade();
    let s2 = items.clone().downgrade();
    let s3 = items.clone().downgrade();

    let v1 = s1.upgrade().unwrap().next().await.unwrap();
    let v2 = s2.upgrade().unwrap().next().await.unwrap();
    let v3 = s3.upgrade().unwrap().next().await.unwrap();
    assert_eq!(v1.id, 1);
    assert_eq!(v2.id, 2);
    assert_eq!(v3.id, 3);

    drop(v1);
    drop(v2);
    drop(v3);
    println!("explicitly dropped values received from forks");

    drop(items);
    println!("dropped forked stream");
}

But the values were never moved, only cloned. The original forked stream still has a copy of all the values.

Dropping Example 1
Dropping Example 2
Dropping Example 3
explicitly dropped values received from forks
Dropping Example 1
Dropping Example 2
Dropping Example 3
dropped forked stream
clarkmcc commented 6 days ago

Alright, this is expected behavior because the original fork is also a first-class, poll-able stream, not just the clones. My problem was that I would only poll clones, not the original fork. This is sort of difficult to work around when you need to clone a stream an unknown number of times, i.e.

let fork = stream::iter(0..5).fork();
for _ in 0..5 {
    let cloned = fork.clone();
    // We poll the cloned streams, but never the original `fork`
}

I was able to resolve this with a little helper trait

/// When cloning [`fork_stream::Forked<S>`], we need to be able to clone
/// them a dynamic number of times without holding onto the original fork.
/// Holding onto the original fork without also pulling it prevents values
/// from ever being dropped, resulting in unbounded memory growth.
///
/// This trait solves this problem by allowing us to clone any `T: Clone`,
/// but specifically a `Forked<S>` in this case, a dynamic number of times.
/// ensuring that the last clone is the original value, and all other clones,
/// are clones of the original value.
///
/// In other words this trait consumes self, returning a vector of clones of
/// self plus the original moved self.
trait CloneN {
    fn clone_n(self, count: usize) -> Vec<Self>
    where
        Self: Clone;
}

impl<T: Clone> CloneN for T {
    fn clone_n(self, count: usize) -> Vec<Self>
    where
        Self: Clone,
    {
        if count == 0 {
            return Vec::new();
        }

        let mut results = Vec::with_capacity(count);

        // Clone all except the last item
        for _ in 0..count - 1 {
            results.push(self.clone());
        }

        // Move the last item
        results.push(self);

        results
    }
}

Which I can use like this

let fork = stream::iter(0..5).fork();
let range = 0..5;
let mut clones = fork.clone_n(range.len());

for _ in range {
    let cloned = clones.remove(0);
    // The last iteration of the loop takes the original fork
}