npgall / cqengine

Ultra-fast SQL-like queries on Java collections
Apache License 2.0
1.72k stars 253 forks source link

groupingByConcurrent is returning incorrect results #157

Open turja12 opened 7 years ago

turja12 commented 7 years ago

Hello, we are getting incorrect results for groupingByConcurent. There is some ghost result which match just part of query. I hope this will be enough for fixing, we are doing grouping on large dataset and doing it in parallel is much faster.

I create simple example of code which is failing.

package com.ca.my;

import com.googlecode.cqengine.attribute.SimpleAttribute;
import com.googlecode.cqengine.query.option.QueryOptions;
import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public class Car {
    public static final SimpleAttribute<Car, Integer> DOORS = new SimpleAttribute<Car, Integer>("doorNumber") {
        public Integer getValue(Car car, QueryOptions queryOptions) {
            return car.doorNumber;
        }
    };
    public static final SimpleAttribute<Car, String> MANUFACTURE = new SimpleAttribute<Car, String>("doorNumber") {
        public String getValue(Car car, QueryOptions queryOptions) {
            return car.manufacturer;
        }
    };

    private final String manufacturer;
    private final int doorNumber;

}
package com.ca.my;

import com.googlecode.cqengine.ConcurrentIndexedCollection;
import com.googlecode.cqengine.IndexedCollection;
import com.googlecode.cqengine.index.navigable.NavigableIndex;
import com.googlecode.cqengine.query.Query;
import com.googlecode.cqengine.query.QueryFactory;
import com.googlecode.cqengine.resultset.ResultSet;

import java.util.Map;
import java.util.stream.Collectors;

import static com.googlecode.cqengine.stream.StreamFactory.streamOf;

public class Pokus {

    public static void main(String[] args) {
        Pokus cqTest = new Pokus();
        cqTest.runMe();
    }

    public void runMe() {
        //this query should not match anything
        Query query = QueryFactory.and(
                QueryFactory.equal(Car.DOORS, 4),
                QueryFactory.equal(Car.MANUFACTURE, "toyota")
        );

        IndexedCollection<Car> indexedCollection = new ConcurrentIndexedCollection<Car>();
        indexedCollection.addIndex(NavigableIndex.onAttribute(Car.DOORS));
        indexedCollection.addIndex(NavigableIndex.onAttribute(Car.MANUFACTURE));
        indexedCollection.add(new Car("ford", 4)); //This will be found by groupingByConcurrent as valid result, but this is not toyota
        indexedCollection.add(new Car("toyota", 3));

        ResultSet<Car> results = indexedCollection.retrieve(query);
        Map<Integer, Long> collect = streamOf(results).collect(Collectors.groupingBy(car -> car.getDoorNumber(), Collectors.counting()));
        Map<Integer, Long> collectParallel = streamOf(results).parallel().collect(Collectors.groupingByConcurrent(car -> car.getDoorNumber(), Collectors.counting()));
        System.out.println("Both values should be same. Not parallel: " + collect.size() + " parallel: " + collectParallel.size());
        System.out.println("parallel map contains: " + collectParallel);     //will show 3=1 (car with id=3 is there once
    }
}

output of this program is here :


Both values should be same. Not parallel: 0 parallel: 1
parallel map contains: {4=1}
npgall commented 7 years ago

The streamOf(ResultSet) method returns a sequential stream, and actually it's not recommended to convert this into a parallel() stream.

Concurrent readers and writers are supported, but each concurrent reader should read from a different ResultSet object. This is because ResultSets are stateful. They encapsulate the reader's position when traversing indexes, and they can store state for some operations which are not easily parallelized such as ordering or deduplicating results.

So if you want to process results in parallel, you could:

  1. Use use a single thread to read from a ResultSet and feed those results into a work queue which is processed in parallel, or
  2. Partition a monolithic query into n disjoint queries, which can be accessed via different ResultsSets in parallel

So sorry about the confusion! I agree that calling .parallel() on the stream is a totally reasonably thing to do given that Java 8 provides an API for it! So I'll try to put a fix in the code to prevent this if possible, or else I'll document this better. I'll keep this issue open as a reminder until then.

Best regards, Niall

turja12 commented 7 years ago

Thanks for explanation.