locationtech / geowave

GeoWave provides geospatial and temporal indexing on top of Accumulo, HBase, BigTable, Cassandra, Kudu, Redis, RocksDB, and DynamoDB.
Apache License 2.0
501 stars 190 forks source link

Secondary Index problem #1626

Closed frknlbhr closed 4 years ago

frknlbhr commented 5 years ago

I added a secondary index for an Integer attribute of my SimpleFeature

NumericIndexStrategy indexStrategy = new SimpleIntegerIndexStrategy(); FeatureAttributeDimensionField[] dimensionFields = {new FeatureAttributeDimensionField(getPlaneFeatureType().getDescriptor("flightId"))}; CommonIndexModel indexModel = new BasicIndexModel(dimensionFields); Index numericIndex = new CustomNameIndex(indexStrategy, indexModel, "flightIdIndex");

"flightId" is declared as an attribute at my SimpleFeatureType definition as; sftBuilder.add(atBuilder.binding(Integer.class).nillable(false).buildDescriptor("flightId"));

When I run my code, it creates two tables for my two different indicies; cassandraDataStore.addType(adapterForPlane, spatialTemporalIndex, numericIndexForFlightId); and I am not sure It should be like that or not. Each table has 1 million rows data right now and when I try to query for flightId with the secondary index(my numeric index defined above), query returns nothing. And when I try the same query with my spatial temporal index, query works fine and returns true result but it takes really too much time (approximately 60 seconds). My query;

final CloseableIterator<SimpleFeature> iterator2 = cassandraDataStore.query( bldr.addTypeName(adapterForPlane.getTypeName()) .allIndicies().constraints( bldr.constraintsFactory() .cqlConstraints("flightId = 8812")) .build()); while (iterator2.hasNext()) { SimpleFeature sf = iterator2.next(); resultList.add(sf); }

So what am I doing wrong about this secondary index for querying in millions of data at cassandra datastore according to Integer field?

rfecher commented 5 years ago

I think you got a really good amount of the way through your use case, the problem is that as of this moment, geowave does not wire the CQL query constraint to a range of rows in that custom index you had defined. To choose the flight ID index replace allIndices() with indexName("flightIdIndex") and to make sure the constraints are applied to the row ranges the easiest approach is to use a different query constraint that will be a bit more direct than CQL (and note CQL is not the cassandra CQL here, its the geoserver CQL). One constraint you could use for a range scan may be new SimpleNumericQuery(...) such as how this section of test code runs.

There's a few other options as well that are direct such as InsertionIdQuery or prefix(...) from QueryConstraintsFactory.

Also, you should be aware, for secondary indexing you have to options at the datastore level. What you are doing currently is duplicating the data in 2 different indices which will be faster retrieval, but additional storage. You can set secondary indexing to enabled on the data store which will write Data IDs to a separate table where the full values are held and each of the indices are just a reference to that table stored by data ID (less storage, extra referencing on retrieval). If your data ID is flight ID in your case that could work out nicely because then as a query constraint you can simply get the data by data ID directly (on the QueryConstraintsFactory there is a method for this). Of course it would have to be unique so that may not work as nicely.

frknlbhr commented 4 years ago

Actually, flightId is not unique in my case because I save each flight's instantaneous location and other infos (such as time) at each second. So, of course a flightId represents a unique flight but in my datastore I have 3600 rows of data for a 1 hour flight and these 3600 rows have same flightId attribute.

I forgat to state that I replaced allIndices() part of my code with indexName("flightIdIndex") and query returned nothing (no exception too). Now, I replaced my CQL part of the query code with SimpleNumericQuery(...) as you suggested and it worked well. Thank you for your help. My query code is

final CloseableIterator<SimpleFeature> iterator2 = cassandraDataStore.query( bldr.addTypeName(adapterForPlane.getTypeName()) .indexName("flightIdIndex").constraints( new SimpleNumericQuery(Range.between((double) 8812, (double) 8812)) ) .build()); while (iterator2.hasNext()) { SimpleFeature sf = iterator2.next(); resultList.add(sf); }

now, and for 3.6 millions rows of data, this query returns resultset (approximately 200.000 rows of data) at 10-12 seconds. However, each returned SimpleFeature object's "flightId" attribute is null. Is this something expected?

And how can I combine this type of query with a spatialtemporal query with using my spatialTemporalIndex and flightIdIndex, I mean is it possible to combine the above with the expression like;

constraints( bldr.constraintsFactory() .spatialTemporalConstraints() .addTimeRange(oneHourBack, currentDate) .build() )

I have a few other questions too but they might not be directly related with this secondary index issue. I appreciate for your help and contribution Mr. @rfecher

rfecher commented 4 years ago

glad you're having much more success.

flightID returning as null is not what I'd expect. It should use the index model to read/write flight ID in that particular situation, perhaps this is an underlying bug we will need to look at.

For the intersection of 2 different queries, we don't have anything built in, but there are ways to make it more efficient. For one, if this is a common use case, I'd consider trying secondary indexing. I'll share code snippet examples later when I have some time.

frknlbhr commented 4 years ago

I would like to see code snippet examples when you have some time

By the way, at the ingest part of my code this warning appears; Data writer of class class org.locationtech.geowave.core.store.index.BasicIndexModel does not support field for org.locationtech.geowave.adapter.vector.FeatureAttributeCommonIndexValue@2a190cf3

rfecher commented 4 years ago

so what I've done for a join before to minimize the volume of data returned is to enable secondary indexing on the data store so that its storing the values once by data ID and then joining as many individual queries as you like by doing the intersection of Data IDs. Furthermore, in order for the secondary index to not return the full value when it is unnecessary I use an aggregation. And lastly, for subsequent queries/aggregation after the initial one you can add a constraint that the data ID must match one thats in your intersection set. In your case you'd have 1 aggregation collecting data IDs and 1 query.

/// the aggregation(s) would look like this:
VectorAggregationQueryBuilder<?, Set<String>> featureDataIdAggregationQuery =
        VectorAggregationQueryBuilder.newBuilder().aggregate(
            <your data type adapter>.getTypeName(),
            (Aggregation) new CollectDataIdsAggregation());
featureDataIdAggregationQuery.indexName("flightIdIndex");
featureDataIdAggregationQuery.constraints( new SimpleNumericQuery(Range.between((double) 8812, (double) 8812)) );
Set<String> dataIdsMatchingFlightName = cassandraDataStore.aggregate(featureDataIdAggregationQuery.build());
// and the final query would just pre filter by the previously collected data IDs and look like this: 
final CloseableIterator<SimpleFeature> joinedResults = cassandraDataStore.query(constraints( new QueryConstraintsFilterWrapper (bldr.constraintsFactory() .spatialTemporalConstraints() .addTimeRange(oneHourBack, currentDate) .build(), new GeoWaveDataIdFilter(dataIdsMatchingFlightName) );

And here's some of the supplemental classes that help if you are trying to be extra efficient while using a data store with secondary indexing enabled to only return data IDs and prefilter by data IDs (avoiding any unnecessary lookups for values that don't match the full intersection):

public class CollectDataIdsAggregation implements
    CommonIndexAggregation<Persistable, Set<String>> {
  private Set<String> results = new HashSet<>();

  public CollectDataIdsAggregation() {}

  @Override
  public Persistable getParameters() {
    return null;
  }

  @Override
  public void setParameters(final Persistable parameters) {}

  @Override
  public Set<String> getResult() {
    return results;
  }

  @Override
  public byte[] resultToBinary(final Set<Integer> result) {
return new byte[0];
  }

  @Override
  public Set<String> resultFromBinary(final byte[] binary) {
//for data stores that distribute the aggregation such as accumulo and hbase the to and from binary would need to be filled in
  }
  @Override
  public void clearResult() {
    results = new HashSet<>();
  }
  @Override
  public void aggregate(final CommonIndexedPersistenceEncoding entry) {
    results.add(StringUtils.fromBinary(entry.getDataId()));
  }
  @Override
  public byte[] toBinary() {
    return new byte[0];
  }
  @Override
  public void fromBinary(final byte[] bytes) {}
}
public class GeoWaveDataIdFilter implements QueryFilter {
  private final Set<String> ids;

  public GeoWaveDataIdFilter (final Set<String> ids) {
    this.ids = ids;
  }

  @Override
  public void fromBinary(final byte[] arg0) {}

  @Override
  public byte[] toBinary() {
    return new byte[0];
  }

  @Override
  public boolean accept(
      final CommonIndexModel indexModel,
      final IndexedPersistenceEncoding<?> encoding) {
    return ids.contains(StringUtils.fromBinary(encoding.getDataId()));
  }
}
public class QueryConstraintsFilterWrapper implements QueryConstraints {
  private final QueryConstraints constraints;

  private final QueryFilter preConstraintsFilter;

  public QueryConstraintsFilterWrapper(
      final QueryConstraints constraints,
      final QueryFilter preConstraintsFilter) {
    super();
    this.constraints = constraints;
    this.preConstraintsFilter = preConstraintsFilter;
  }

  @Override
  public List<QueryFilter> createFilters(final Index index) {
    final List<QueryFilter> constraintsFilters = constraints.createFilters(index);

    final List<QueryFilter> retVal =
        new ArrayList<>(
            constraintsFilters.size() + 1);
    retVal.add(preConstraintsFilter);
    retVal.addAll(constraintsFilters);
    return retVal;
  }

  @Override
  public void fromBinary(final byte[] binary) {
    constraints.fromBinary(binary);
  }

  @Override
  public List<MultiDimensionalNumericData> getIndexConstraints(final Index index) {
    return constraints.getIndexConstraints(index);
  }

  @Override
  public byte[] toBinary() {
    return constraints.toBinary();
  }
}
rfecher commented 4 years ago

regarding the error message "Data writer of class class org.locationtech.geowave.core.store.index.BasicIndexModel does not support field for org.locationtech.geowave.adapter.vector.FeatureAttributeCommonIndexValue@2a190cf3" that'd be why you're seeing null flightIDs ... are you enabling secondary indexing on the data store at this point? Basically for you CommonIndexModel you'll need to return a reader and writer for field name "flightIdIndex" although I'm not sure thats required when secondary indexing is enabled.

frknlbhr commented 4 years ago

Basically I define two indices, one is spatialtemporal, and the secondary index is "flightIdIndex".

public Index getSpatioTemporalIndex() { return new SpatialTemporalDimensionalityTypeProvider.SpatialTemporalIndexBuilder().setPartitionStrategy(IndexPluginOptions.PartitionStrategy.ROUND_ROBIN).setNumPartitions(10) .setPeriodicity(TemporalBinningStrategy.Unit.HOUR).createIndex(); }

public Index getNumericIndexForFlightId() { NumericIndexStrategy indexStrategy = new SimpleIntegerIndexStrategy(); FeatureAttributeDimensionField[] dimensionFields = {new FeatureAttributeDimensionField(getFlightFeatureType().getDescriptor("flightId"))}; CommonIndexModel indexModel = new BasicIndexModel(dimensionFields); Index numericIndex = new CustomNameIndex(indexStrategy, indexModel, "flightId_IDX"); return numericIndex; }

and I use these two indicies when writing data to Cassandra as;

cassandraDataStore.addType(adapterForFlights, spatioTemporalIndex, numericIndexForFlightId); Writer<SimpleFeature> writer = cassandraDataStore.createWriter(adapterForFlights.getTypeName());

and use this writer object to save data. So I couldn't get what you mean by enabling secondary indexing on datastore. Should I set some other fields of CommonIndexModel when I define my secondary index?

frknlbhr commented 4 years ago

By the way, I achieved to combine queries with your QueryConstraintsFilterWrapper class approach. Collecting dataId's matching with flight Id by aggregation part takes 3-5 seconds and querying with spatialTemporal queryConstraint part which works on the collected set takes almost 50-60 seconds. Consequently, this result shows that I cannot take advantage of my spatialTemporal Index. Is this long duration something expected when using this approach?

My final code is;

final VectorQueryBuilder bldr = VectorQueryBuilder.newBuilder();

VectorAggregationQueryBuilder<?, Set<String>> featureDataIdAggregationQuery = VectorAggregationQueryBuilder.newBuilder().aggregate(adapterForFlights.getTypeName(), (Aggregation) new CollectDataIdsAggregation());

featureDataIdAggregationQuery.indexName(numericIndexForFlightId.getName());

featureDataIdAggregationQuery.constraints(new SimpleNumericQuery(Range.between((double) 8812, (double) 8812)));

Set<String> dataIdsMatchingFlightName = cassandraDataStore.aggregate(featureDataIdAggregationQuery.build());

final CloseableIterator<SimpleFeature> joinedResults = cassandraDataStore.query(bldr .addTypeName(adapterForFlights.getTypeName()) .indexName(spatialTemporalIndex.getName()) .constraints(new QueryConstraintsFilterWrapper(bldr.constraintsFactory().spatialTemporalConstraints().addTimeRange(oneHourBack, currentDate).build(), new GeoWaveDataIdFilter(dataIdsMatchingFlightName))) .build() );

while (joinedResults.hasNext()) { SimpleFeature sf = joinedResults.next(); resultList.add(sf); }

rfecher commented 4 years ago

ah, so when I mention enabling secondary indexing I'm referring to CassandraOptions.setSecondaryIndexing(true) when you create the data store (this must stay constant on the data store so if you've already ingested data you can't switch it, but you can ingest into a different gwNamespace if you want to give it a try). Its a member of CassandraRequiredOptions which is how you end up creating a data store and its also an available option from the commandline on geowave store add

Regarding slow spatial-temporal queries, I think you'll likely want a periodicity greater than an hour (try month for example... the default is year). The space filling curve portion of the keys is broken up by that periodicity so you'd basically only have effective range scans within periodicity boundaries. Also, you are not specifying a spatial extent from what I see so you'd be considerably better off with a temporal index than a spatial-temporal index for this type of query. A spatial-temporal index works best with both reasonable geospatial and temporal constraints.

frknlbhr commented 4 years ago

I will try enabling secondary index with a new gwNamespace. Actually I will add reasonable geospatial and temporal constraints in the future, however I also need non-spatial queries too now. Thank you for your contribution, I am able to add and use secondary numeric index to my datastore successfully. I have a few other questions (and performance issues) too but I'm closing this issue if its ok for you too Mr. @rfecher