add set intersection util for two sorted streams
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
bae0667066
commit
aea82183b2
2 changed files with 70 additions and 1 deletions
|
@ -1,4 +1,10 @@
|
||||||
use std::cmp::{Eq, Ord};
|
use std::{
|
||||||
|
cmp::{Eq, Ord},
|
||||||
|
pin::Pin,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
|
||||||
use crate::{is_equal_to, is_less_than};
|
use crate::{is_equal_to, is_less_than};
|
||||||
|
|
||||||
|
@ -45,3 +51,27 @@ where
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Intersection of sets
|
||||||
|
///
|
||||||
|
/// Outputs the set of elements common to both streams. Streams must be sorted.
|
||||||
|
pub fn intersection_sorted_stream2<Item, S>(a: S, b: S) -> impl Stream<Item = Item> + Send
|
||||||
|
where
|
||||||
|
S: Stream<Item = Item> + Send + Unpin,
|
||||||
|
Item: Eq + PartialOrd + Send + Sync,
|
||||||
|
{
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
let b = Arc::new(Mutex::new(b.peekable()));
|
||||||
|
a.map(move |ai| (ai, b.clone()))
|
||||||
|
.filter_map(|(ai, b)| async move {
|
||||||
|
let mut lock = b.lock().await;
|
||||||
|
while let Some(bi) = Pin::new(&mut *lock).next_if(|bi| *bi <= ai).await.as_ref() {
|
||||||
|
if ai == *bi {
|
||||||
|
return Some(ai);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -237,3 +237,42 @@ fn set_intersection_sorted_all() {
|
||||||
let r = intersection_sorted(i.into_iter());
|
let r = intersection_sorted(i.into_iter());
|
||||||
assert!(r.eq(["bar", "baz", "foo"].iter()));
|
assert!(r.eq(["bar", "baz", "foo"].iter()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn set_intersection_sorted_stream2() {
|
||||||
|
use futures::StreamExt;
|
||||||
|
use utils::{set::intersection_sorted_stream2, IterStream};
|
||||||
|
|
||||||
|
let a = ["bar"];
|
||||||
|
let b = ["bar", "foo"];
|
||||||
|
let r = intersection_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.await;
|
||||||
|
assert!(r.eq(&["bar"]));
|
||||||
|
|
||||||
|
let r = intersection_sorted_stream2(b.iter().stream(), a.iter().stream())
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.await;
|
||||||
|
assert!(r.eq(&["bar"]));
|
||||||
|
|
||||||
|
let a = ["aaa", "ccc", "xxx", "yyy"];
|
||||||
|
let b = ["hhh", "iii", "jjj", "zzz"];
|
||||||
|
let r = intersection_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.await;
|
||||||
|
assert!(r.is_empty());
|
||||||
|
|
||||||
|
let a = ["aaa", "ccc", "eee", "ggg"];
|
||||||
|
let b = ["aaa", "bbb", "ccc", "ddd", "eee"];
|
||||||
|
let r = intersection_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.await;
|
||||||
|
assert!(r.eq(&["aaa", "ccc", "eee"]));
|
||||||
|
|
||||||
|
let a = ["aaa", "ccc", "eee", "ggg", "hhh", "iii"];
|
||||||
|
let b = ["bbb", "ccc", "ddd", "fff", "ggg", "iii"];
|
||||||
|
let r = intersection_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||||
|
.collect::<Vec<&str>>()
|
||||||
|
.await;
|
||||||
|
assert!(r.eq(&["ccc", "ggg", "iii"]));
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue