square / tape

A lightning fast, transactional, file-based FIFO for Android and Java.
https://square.github.io/tape/
Apache License 2.0
2.47k stars 288 forks source link

Implement peek(int nth) #174

Open aacic opened 6 years ago

aacic commented 6 years ago

The method should read the nth element from the queue without changing the queue.

A potential use case of this method could be to read a portion from the queue in a loop and process it in a streaming API manner. If and only if all of the portion elements are processed successfully then the whole portion is removed from the queue.

JakeWharton commented 6 years ago

Do you have an actual use case for this? We don't want to add an API for a purely hypothetical use case.

On Tue, Oct 17, 2017, 9:39 AM aacic notifications@github.com wrote:

The method should read the nth element from the queue without changing the queue.

A potential use case of this method could be to read a portion from the queue in a loop and process it in a streaming API manner. If and only if all of the portion elements are processed successfully then the whole portion is removed from the queue.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/square/tape/issues/174, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEc-qphWkjJ0UfTsxEgUSQbFj8oPnks5stK4sgaJpZM4P8K3D .

NightlyNexus commented 6 years ago

The hypothetical use case stated sounds like the existing APIs:

List<Data> section = queue.peek(num);
if (processSuccessfully(section)) {
  queue.pop(num);
}

Is this instead a request for Data nthElement = queue.peekBackInQueue(n)? I'm not sure what the use case would be, but could you do it in your application code by peeking n elements in the list, and getting the last element?

f2prateek commented 6 years ago

If the request is for getting the first n elements (and not specifically the nth element only); then the iterators can be used pretty efficiently as they're lazy.

    int i = 0;
    for (byte[] bytes : queueFile) {
      if (i++ > n) {
        break;
      }
      process(bytes);
    }
jhass commented 6 years ago

peek(n) doesn't allow for minimal memory usage and the iterator doesn't allow for concurrent modification.

aacic commented 6 years ago

A potential use case would look like this:

int n = 10 // number of elements for procesing

for (int i = 0; i < n; i++) { Element element = queue.peek(i); // reads the ith element StreamingAPIProcessor.process(element); }

if (StreamingAPIProcessor.isSuccessful()) { queue.remove(n); }

Given that there is a queue of 10 000 elements. And we want to process 1000 element as a portion (e.g. upload it to the server.). If we process 1000 element in the streaming manner we'll consume less memory. We can't use the iterator because it doesn't allow concurrent modifications.

f2prateek commented 6 years ago

I don't understand the example you shared fully (StreamingAPIProcessor processes one element at a time but reports status about n elements?). But anyway, that example could be written simply as:

    int n = 10;
    int i = 0;
    for (byte[] bytes : queueFile) {
      if (i++ > n) {
        break;
      }
       StreamingAPIProcessor.process(bytes);
    }

   if (StreamingAPIProcessor.isSuccessful()) {
     queue.remove(n);
   }

I'm not sure where the question of concurrent modification comes in here - your example is synchronous, you're processing n elements one at a time, and removing n elements when all are done.

aacic commented 6 years ago

StreamingAPIProcessor is e.g. uploading n elements to the server using e.g. JsonGenerator (Jackson Streaming API). So, if the network request is successful we remove n elements. The idea is not to load n elements into the memory but to read only one element at a time.

In general, the system generates new elements and puts them into the queue (on some other threads). So that's why we can't use the iterator.

jhass commented 6 years ago

So one has to provide some pseudo code with a concurrent access adding stuff to the queue just so you accept it as a valid usecase? Oh well


queue = new QueueFile()
new Thread() {
  run() {
    synchronized(queue) { queue.add(rand()) }
    sleep(rand())
  }
}

while (!queue.isEmpty()) {
  batchSize = min(queue.size(), MAX_BATCH_SIZE)
  for (int i = 0; i < batchSize; i++) {
    synchronized(queue) { item = queue.get(i) }
    stream(item)
  }
  if (isBatchSuccessfull()) {
    synchronized(queue) { queue.remove(batchSize) }
  }
}
hadrienk commented 6 years ago

Took me a while to realize that the iterator() is not available in the version of the documentation.

<dependency>
    <groupId>com.squareup</groupId>
    <artifactId>tape</artifactId>
    <version>1.2.3</version>
</dependency>