TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.54k stars 183 forks source link

remove deref bound from containers #417

Closed frankmcsherry closed 10 months ago

frankmcsherry commented 10 months ago

Same as #416 but for some reason this one works. Next step: figure out what is the diff between the two and why that makes #416 fail.

antiguru commented 10 months ago

416 mixes up the offset calculations, which I think is why it doesn't work. Here's the diff between the two branches:

diff --git a/src/trace/layers/mod.rs b/src/trace/layers/mod.rs
index 255033c8..1a026203 100644
--- a/src/trace/layers/mod.rs
+++ b/src/trace/layers/mod.rs
@@ -118,6 +118,8 @@ pub trait BatchContainer: Default {
     /// Inserts a borrowed item.
     fn copy(&mut self, item: &Self::Item);
     /// Extends from a slice of items.
+    fn copy_slice(&mut self, slice: &[Self::Item]);
+    /// Extends from a slice of items.
     fn copy_range(&mut self, other: &Self, start: usize, end: usize);
     /// Creates a new container with sufficient capacity.
     fn with_capacity(size: usize) -> Self;
@@ -126,12 +128,11 @@ pub trait BatchContainer: Default {
     /// Creates a new container with sufficient capacity.
     fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;

-    /// Roughly the `std::ops::Index` trait.
+    /// Reference to element at this position.
     fn index(&self, index: usize) -> &Self::Item;
-    /// Number of elements contained.
+    /// Number of contained elements
     fn len(&self) -> usize;
-
-
+    
     /// Reports the number of elements satisfing the predicate.
     ///
     /// This methods *relies strongly* on the assumption that the predicate
@@ -140,18 +141,18 @@ pub trait BatchContainer: Default {
     /// count the number of elements in time logarithmic in the result.
     fn advance<F: Fn(&Self::Item)->bool>(&self, start: usize, end: usize, function: F) -> usize {

-        let small_limit = start + 8;
+        let small_limit = 8;

         // Exponential seach if the answer isn't within `small_limit`.
-        if end > small_limit && function(self.index(small_limit)) {
+        if end > start + small_limit && function(self.index(start + small_limit)) {

             // start with no advance
             let mut index = small_limit + 1;
-            if index < end && function(self.index(index)) {
+            if start + index < end && function(self.index(start + index)) {

                 // advance in exponentially growing steps.
                 let mut step = 1;
-                while index + step < end && function(self.index(index + step)) {
+                while start + index + step < end && function(self.index(start + index + step)) {
                     index += step;
                     step = step << 1;
                 }
@@ -159,7 +160,7 @@ pub trait BatchContainer: Default {
                 // advance in exponentially shrinking steps.
                 step = step >> 1;
                 while step > 0 {
-                    if index + step < end && function(self.index(index + step)) {
+                    if start + index + step < end && function(self.index(start + index + step)) {
                         index += step;
                     }
                     step = step >> 1;
@@ -171,11 +172,10 @@ pub trait BatchContainer: Default {
             index
         }
         else {
-            let limit = std::cmp::min(end, small_limit);
-            (start..limit).filter(|x| function(self.index(*x))).count()
+            let limit = std::cmp::min(end, start + small_limit);
+            (start .. limit).filter(|x| function(self.index(*x))).count()
         }
     }
-
 }

 impl<T: Clone> BatchContainer for Vec<T> {
@@ -186,6 +186,9 @@ impl<T: Clone> BatchContainer for Vec<T> {
     fn copy(&mut self, item: &T) {
         self.push(item.clone());
     }
+    fn copy_slice(&mut self, slice: &[T]) {
+        self.extend_from_slice(slice);
+    }
     fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
         self.extend_from_slice(&other[start .. end]);
     }
@@ -198,12 +201,8 @@ impl<T: Clone> BatchContainer for Vec<T> {
     fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
         Vec::with_capacity(cont1.len() + cont2.len())
     }
-    fn index(&self, index: usize) -> &Self::Item {
-        &self[index]
-    }
-    fn len(&self) -> usize {
-        self.len()
-    }
+    fn index(&self, index: usize) -> &Self::Item { &self[index] }
+    fn len(&self) -> usize { self[..].len() }
 }

 impl<T: Columnation> BatchContainer for TimelyStack<T> {
@@ -214,6 +213,12 @@ impl<T: Columnation> BatchContainer for TimelyStack<T> {
     fn copy(&mut self, item: &T) {
         self.copy(item);
     }
+    fn copy_slice(&mut self, slice: &[T]) {
+        self.reserve_items(slice.iter());
+        for item in slice.iter() {
+            self.copy(item);
+        }
+    }
     fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
         let slice = &other[start .. end];
         self.reserve_items(slice.iter());
@@ -231,10 +236,7 @@ impl<T: Columnation> BatchContainer for TimelyStack<T> {
         new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
         new
     }
-    fn index(&self, index: usize) -> &Self::Item {
-        &self[index]
-    }
-    fn len(&self) -> usize {
-        self[..].len()
-    }
+
+    fn index(&self, index: usize) -> &Self::Item { &self[index] }
+    fn len(&self) -> usize { self[..].len() }
 }
frankmcsherry commented 10 months ago

The mystery is resolved! #416 like this PR returns index from the advance call, but in #416 it should be index - start. Oops! But this should be good to go.