spotify / completable-futures

Utilities for working with futures in Java 8
Apache License 2.0
393 stars 51 forks source link

Port ConcurrecnyLimiter from FuturesExtra to work with CompletionStage #53

Closed eshrubs closed 7 years ago

eshrubs commented 7 years ago

This has been renamed to ConcurrencyReducer to avoid class conflicts with FuturesExtra, as they use the same package as CompletableFutures

codecov-io commented 7 years ago

Codecov Report

Merging #53 into master will not change coverage. The diff coverage is 100%.

Impacted file tree graph

@@           Coverage Diff           @@
##             master    #53   +/-   ##
=======================================
  Coverage       100%   100%           
- Complexity       43     63   +20     
=======================================
  Files             4      5    +1     
  Lines            99    162   +63     
  Branches          3      9    +6     
=======================================
+ Hits             99    162   +63
Impacted Files Coverage Δ Complexity Δ
...n/java/com/spotify/futures/ConcurrencyReducer.java 100% <100%> (ø) 20 <20> (?)

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update a5a936d...738a16a. Read the comment docs.

eshrubs commented 7 years ago

The coverage has been lowered, so the -1 from codcov. However this line is not covered in FuturesExtra. By my point of view we can never get null on that line, but perhaps if we do not releasing would be a bug? Not sure

pettermahlen commented 7 years ago

I think 'job' can be null if two threads race - both get queue.isEmpty() == true, but only one gets a non-null response from queue.poll? It's a hard case to test, I think.

eshrubs commented 7 years ago

@pettermahlen Yeah, I think this would be hard to test. isEmpty() for a BlockingQueue from my reading uses size() is implemented with a ReentrantLock.

The logic here is correct however - we need to return the ticket if we didn't actually get a job.

I wonder if this could be refactored to rely on poll instead of checking isEmpty() every time? Then we would requeue the job if we can't get a permit

Job<T> job = null;
while (!(job = queue.poll()) == null) {
  if (!limit.tryAcquire()) {
    queue.enqueue(job);
    return null;
  }
  return job;
}
return null;
pettermahlen commented 7 years ago

I think that algorithm would reorder jobs? Perhaps flipping it around would be better:

// speculatively try to acquire a permit
if (!limit.tryAcquire()) {
   return null;
}

Job<T> job = queue.poll();
if (job == null) {
   // nothing to do; return the permit
   limit.release();
}

return job;
eshrubs commented 7 years ago

Ah yes, ordering matters.

However flipping will still end-up hard to test I think

pettermahlen commented 7 years ago

Well, I guess a good thing about my suggested algorithm (if it works), is it's not check-then-act on the queue, so there's no race between two threads. The case when 'queue is not empty' followed by 'poll returns null' doesn't exist. So the scenario that isn't tested doesn't exist with that version, I think.

spkrka commented 7 years ago

Not sure how to fix the untestable line - we can't reorder it as you mention, and we dont want to risk having jobs stuck on queue forever if they dont get pumped for some reason

pettermahlen commented 7 years ago

@spkrka what about my alternative algorithm? It looks safe to me, as well as a little more straightforward to understand?

spkrka commented 7 years ago

@pettermahlen I think you are right. I've done the refactoring as a PR here https://github.com/spotify/futures-extra/pull/28 which could be useful to review before we finish this review, as that one is shorter and easier to compare

eshrubs commented 7 years ago

@pettermahlen @spkrka Have rebased and moved the new grabJob over