locationtech / geowave

GeoWave provides geospatial and temporal indexing on top of Accumulo, HBase, BigTable, Cassandra, Kudu, Redis, RocksDB, and DynamoDB.
Apache License 2.0
502 stars 190 forks source link

when I use multiple threads to call Geowave API to write data , and i found there are missing some data. #1485

Closed scially closed 5 years ago

scially commented 5 years ago

when I use multiple threads to call org.locationtech.geowave.core.store.api.Writer to write data at the same time, and add layer to geoserver, and i found there are missing some data. Is org.locationtech.geowave.core.store.api.Writer thread safe?

       BlockingQueue<SimpleFeature> queue = new LinkedBlockingQueue<>(INGEST_BATCH_SIZE);

        final SimpleFeatureType sft = collection.getSchema();

        final FeatureDataAdapter dataAdapter = new FeatureDataAdapter(sft);
        final Writer<SimpleFeature> writer = createWrite(dataAdapter, index);
        // init thread pool
        ExecutorService ingestExecutor = Executors.newFixedThreadPool(threads);
        final List<SimpleIngestTask> ingestTasks = new ArrayList<>();
        try {
            for (int i = 0; i < threads; i++) {
                final String id = String.valueOf(i);
                final SimpleIngestTask task = new SimpleIngestTask(
                        id,
                        queue,
                        writer);
                ingestTasks.add(task);
                ingestExecutor.submit(task);
            }

            while (iter.hasNext()) {
                 queue.offer(iter.next())...
                 ....
           }

SimplIngestTask.run()

        try {
            while (true) {
                SimpleFeature geowaveData = readQueue.poll(
                        100,
                        TimeUnit.MILLISECONDS);
                if (geowaveData == null) {
                    if (isTerminated && readQueue.size() == 0) {

                        break;
                    }
                    continue;
                }

                writer.write(geowaveData);
                i++;
            }
        }catch (Exception e){
            ...
        }finally {
            isFinished = true;
        }
    }

image

rfecher commented 5 years ago

org.locationtech.geowave.core.store.api.Writer is intended to be thread-safe and I haven't noticed any issues like this. Each Writer wraps a writer object of whatever the underlying key-value store is, so if you're using HBase that would be BufferedMutator so knowing which key-value store you're using might be helpful if there's a nuanced issue with one of them. One question I have though, in looking at the code snippet provided I do not see writer.close() which is essential to flushing the last batch of data. Is that close() invocation just in other code not part of this snippet?

scially commented 5 years ago

i call writer.close() in try..finally{...} , and I counted the number of simplefeatures written by each thread, The total number is equal to the total number of features in the shapefile.

  BlockingQueue<SimpleFeature> queue = new LinkedBlockingQueue<>(INGEST_BATCH_SIZE);

        final SimpleFeatureType sft = collection.getSchema();

        final FeatureDataAdapter dataAdapter = new FeatureDataAdapter(sft);
        final Writer<SimpleFeature> writer = createWrite(dataAdapter, index);
        // init thread pool
        ExecutorService ingestExecutor = Executors.newFixedThreadPool(threads);
        final List<SimpleIngestTask> ingestTasks = new ArrayList<>();
        try {
            for (int i = 0; i < threads; i++) {
                final String id = String.valueOf(i);
                final SimpleIngestTask task = new SimpleIngestTask(
                        id,
                        queue,
                        writer);
                ingestTasks.add(task);
                ingestExecutor.submit(task);
            }

            while (iter.hasNext()) {
                 queue.offer(iter.next())...
                 ....
           }finally{
                ...
                writer.close();
           }
scially commented 5 years ago

@rfecher sorry, I found the error, I called writer.close() too early.... thanks ...

rfecher commented 5 years ago

Glad to hear it's resolved