tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
26.57k stars 2.45k forks source link

tokio_stream::StreamMap allows duplicate keys #4774

Open nashley opened 2 years ago

nashley commented 2 years ago

Version List the versions of all tokio crates you are using. This is version independent, but here you go:

$ cargo tree | grep tokio
tokio_stream--StreamMap v0.1.0 (/home/user/git/tmp/tokio_stream--StreamMap)
└── tokio-stream v0.1.9
    └── tokio v1.19.2

Platform The output of uname -a (UNIX), or version and 32 or 64-bit (Windows) This is platform independent, but here you go:

$ uname -a
Linux archlinux 5.18.3-arch1-1 #1 SMP PREEMPT_DYNAMIC Thu, 09 Jun 2022 16:14:10 +0000 x86_64 GNU/Linux

Description [short summary of the bug] tokio_stream::StreamMap::insert followed by tokio_stream::StreamMap::extend allows for duplicate keys.

[code sample that causes the bug]

use tokio_stream;

fn main() {
    let mut streams = tokio_stream::StreamMap::new();
    streams.insert("key", tokio_stream::pending::<u8>());
    streams.extend(vec![
        ("key", tokio_stream::pending::<u8>())
    ]);

    for (key, value) in streams.iter() {
        println!("{}: {:?}", key, value);
    }
}

I expected that the duplicate keys would overwrite existing ones like they are with from_iter (which calls insert, which calls remove). However, I think it'd be best if both methods raised errors (see #4775) or at least returned a list of the replaced keys to match insert's functionality. This way, the calling code would at least know what keys were replaced.

Instead, you can see that the StreamMap now contains duplicate keys:

key: Pending(PhantomData)
key: Pending(PhantomData)
nashley commented 2 years ago

PR that introduced this: https://github.com/tokio-rs/tokio/pull/4272

nashley commented 2 years ago

For reference, here's an example showing that from_iter removes duplicate keys:

use tokio_stream;

fn main() {
    let streams_from_iter = tokio_stream::StreamMap::from_iter(vec![
        ("key", tokio_stream::pending::<u8>()),
        ("key", tokio_stream::pending::<u8>())
    ]);

    for (key, value) in streams_from_iter.iter() {
        println!("{}: {:?}", key, value);
    }
}

Results in:

key: Pending(PhantomData)
nashley commented 2 years ago

If this is indeed undesired behavior, then I think the simplest solution is to just iterate over the Vec<(K, V)> and call insert on each entry. I can make a PR for that if desired.

Darksonn commented 2 years ago

I agree that this is a bug. It should not allow duplicate keys.

nashley commented 2 years ago

How concerned with the runtime performance impact are we?

Given n is the length of the existing StreamMap and m is the length of the Vec being appended: With the naive solution...

// O(n*m) or worse if the underlying `Vec` needs to be resized, potentially multiple times:
for (key, value) in iter {
    self.insert(key, value);
}

... the runtime goes from O(n+m) (just an [allocation + copy] if necessary) to O(n*m)[1], since each of the keys being inserted is checked against each of the keys that already exist.

[1] or worse if the underlying Vec is unable to hold all of iter's items. We can't use reserve due to iter not being ExactSizeIterator.

Alternatively, we could still use extend but then sort and deduplicate the result...

self.entries.extend(iter.into_iter()); // O(n+m)
// O((n+m) * log(n+m)):
self.entries.sort_by(|a, b|
    if a.0.eq(&b.0) {
        std::cmp::Ordering::Equal
    } else {
        std::cmp::Ordering::Less
    }
);
self.entries.dedup_by(|a, b| a.0.eq(&b.0)); // O(n+m)

This removes duplicates (while preserving the original keys but not their order) with a runtime of O((n+m)*(2+log(n+m))), but I think it's more fragile. It also doesn't allow us to return the items that were ignored due to being duplicates unless we implement our own dedup_by.

I'd prefer the former solution unless we have reason to believe this will lead to a noticeable performance impact.

Noah-Kennedy commented 2 years ago

I think that going to polynomial time is fundamentally unworkable here.

nashley commented 2 years ago

Should from_iter be changed to not run in O((n^2-n)/2) then? It uses the naive solution that I listed above.

I'm not sure what expected values of n and m are, but here's a graph of the three runtimes: https://www.desmos.com/calculator/iwn1q7w7t6

Darksonn commented 2 years ago

Well, on one hand, the StreamMap type already has some pretty bad running time complexities. On the other hand, I would prefer to not make them worse.

ghost commented 2 years ago

Also, the incoming vector in streams.extend could have duplicates within it.

ghost commented 2 years ago

One more thing, the duplicates are removed using swap_remove function which replaces the removed element with last element. This will reorder the vector which goes against the specification of StreamMap which promises to retain the incoming order.

https://github.com/tokio-rs/tokio/blob/199878e287041a49ebd4f29a8dd39f12fd3a56aa/tokio-stream/src/stream_map.rs#L447

nashley commented 2 years ago

This will reorder the vector which goes against the specification of StreamMap which promises to retain the incoming order

Where is that specified? I only see this:

StreamMap is similar to StreamExt::merge in that it combines source streams into a single merged stream that yields values in the order that they arrive from the source streams

Which, as I understand it, applies to the order of values received within a stream rather than the order of the streams themselves.

All the -> Iterator<> methods are documented with:

An iterator visiting ... in arbitrary order.

Am I misunderstanding something?

ghost commented 2 years ago

@nashley I interpreted the quote you highlighted as the StreamMap should return items in the order they were added similar to a FIFO queue.

ghost commented 2 years ago

I have a proposal. If the proposal is acceptable then I'll do the code change. The proposal is to maintain a HashMap (or BTreeMap?) with key value pair as `<key, index in vector>. Before inserting we can check in the HashMap for the key and if it's present, we can get the position in the vector and perform the deletion. This approach would remove the O(n) complexity for every insert and replace it with the complexity to fetch and insert into the HashMap. I am not sure of the complexity of Rust HashMap. We can use some other map implementations if any one has suggestions.

Darksonn commented 2 years ago

I would be ok with using a HashMap. I think you can just store the Stream directly in the map without the index trick. There's no need to retain the ordering of the streams - when streams become ready at the same time, we may return them in every order.

nashley commented 2 years ago

@DevSabb let me know if you want assistance implementing, documenting, or testing this.

ghost commented 2 years ago

@nashley The change now is more complex that one I proposed. I'll take a look into the code before confirming if I wan't to pick this up. Is it okay if I confirm in couple of days?

nashley commented 2 years ago

Of course! I can also work on implementing this if you change your mind.

ghost commented 2 years ago

@nashley I started working on this ticket. Will let you know if I have questions.

ghost commented 2 years ago

I have a question. I had to include the traits Hash + Eq to the extend function (see below). I get an error that impl has stricter requirements than trait. Any idea how can I fix this.

impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
    fn extend<T>(&mut self, iter: T)
    where
        T: IntoIterator<Item = (K, V)>,
        K: Hash + Eq,
    {
        for (key, value) in iter {
            self.entries.insert(key, value);
        }
    }
}
Darksonn commented 2 years ago

Ah that sucks. It would require a breaking release to make this change, then.

(the particular error in question is fixed by putting the K: Hash bound on the impl block instead of the fn, but it's still a breaking change, so it won't work for other reasons)

ghost commented 2 years ago

@Darksonn Thank you! Yes, I had to include the trait Eq + Hash in multiple places.

I am also wondering if I can get some suggestion on how to transform the map iterator to list iterator

    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
        self.entries.iter()
    }

Error:

type mismatch resolving `<std::collections::hash_map::Iter<'_, K, V> as std::iter::Iterator>::Item == &(K, V)`

expected tuple, found `&(K, V)`

note:  expected tuple `(&K, &V)`
      found reference `&(K, V)`