maidh91 / guava-libraries

Automatically exported from code.google.com/p/guava-libraries
Apache License 2.0
0 stars 0 forks source link

#remove() on a computing map causes simultaneous threads in computingFunction #462

Closed GoogleCodeExporter closed 9 years ago

GoogleCodeExporter commented 9 years ago
It appears that using #remove() on a computing map when there are multiple 
other threads getting the same key causes more than one thread to be executing 
computingFunction.

This is inspired by having a requirement to have only one outstanding RPC 
request to a slow backend server and assuming that the backend server would 
return the same result for RPCs at the same time.

http://stackoverflow.com/questions/4038949/using-mapmakermakecomputingmap-to-pre
vent-simultaneous-rpcs-for-the-same-data/4039080

Sample Scala code below.  When running it, it'll show that there are multiple 
threads in the computing function, looking for two threads printing "sleeping 
in function".

$ scalac -classpath guava-r07/guava-r07.jar t1.scala && scala -classpath 
guava-r07/guava-r07.jar:. Test

I understand if this isn't a use case that the computing map was designed to 
handle, I can switch to using a 1 ms expiration or weak value, but these are 
more expensive in number of objects in the map and/or using a background thread 
to remove entries from the map.

import com.google.common.collect.MapMaker

import java.util.concurrent.{ConcurrentMap,
                             CountDownLatch}

private
class ComputingMapFunction
  extends com.google.common.base.Function[String,Long]
{
  override
  def apply(key: String): Long =
  {
    System.err.println("" + Thread.currentThread + " sleeping in function #1")
    val now = System.nanoTime
    System.err.println("" + Thread.currentThread + " sleeping in function #2")
    now
  }
}

private
class MyThread(computing_map: ConcurrentMap[String,Long],
               latch: CountDownLatch)
  extends Thread
{
  override
  def run(): Unit =
  {
    System.err.println("" + this + " waiting")
    latch.countDown()
    latch.await
    System.err.println("" + this + " processing")

    try {
      val key = "key"
      var last: Long = 0
      for (i <- 1 to 2) {
        try {
          System.err.println("" + this + " getting")
          last = computing_map.get(key)
          System.err.println("" + this + " " + last + " remove")
        }
        finally {
          computing_map.remove(key)
        }
      }
    }
    catch {
      case e => e.printStackTrace
    }
  }
}

object Test
{
  def main(args: Array[String]): Unit =
  {
    val f = new ComputingMapFunction
    val computing_map = new MapMaker().makeComputingMap[String,Long](f)

    val n = 10
    val latch = new CountDownLatch(n+1)
    val threads = {for (i <- 1 to n)
                     yield new MyThread(computing_map, latch)}.toArray
    for (t <- threads) t.start()
    latch.countDown()
    for (t <- threads) t.join()
  }
}

Original issue reported on code.google.com by blair-ol...@orcaware.com on 30 Oct 2010 at 1:00

GoogleCodeExporter commented 9 years ago
The code compiles with both Scala 2.7.7 and 2.8.0.  I ran this on an 8-core 
system and on a year old MacBook Pro and was able to get the same results.

Original comment by blair-ol...@orcaware.com on 30 Oct 2010 at 1:16

GoogleCodeExporter commented 9 years ago
Can you please provide the output that you are seeing when you run this program?

Original comment by fry@google.com on 2 Nov 2010 at 6:40

GoogleCodeExporter commented 9 years ago
Here's a sample run.  Thread-0,5,main enters the computing function, then 
Thread-5,5,main enters it and leaves it, then Thread-0,5,main leaves it.

There's another time when Thread-8,5,main and Thread-9,5,main are in the 
function at the same time.

Thread[Thread-0,5,main] waiting
Thread[Thread-4,5,main] waiting
Thread[Thread-6,5,main] waiting
Thread[Thread-7,5,main] waiting
Thread[Thread-8,5,main] waiting
Thread[Thread-1,5,main] waiting
Thread[Thread-2,5,main] waiting
Thread[Thread-3,5,main] waiting
Thread[Thread-5,5,main] waiting
Thread[Thread-9,5,main] waiting
Thread[Thread-0,5,main] processing
Thread[Thread-4,5,main] processing
Thread[Thread-6,5,main] processing
Thread[Thread-7,5,main] processing
Thread[Thread-8,5,main] processing
Thread[Thread-1,5,main] processing
Thread[Thread-2,5,main] processing
Thread[Thread-3,5,main] processing
Thread[Thread-5,5,main] processing
Thread[Thread-9,5,main] processing
Thread[Thread-6,5,main] getting
Thread[Thread-7,5,main] getting
Thread[Thread-9,5,main] getting
Thread[Thread-4,5,main] getting
Thread[Thread-6,5,main] sleeping in function #1
Thread[Thread-6,5,main] sleeping in function #2
Thread[Thread-5,5,main] getting
Thread[Thread-5,5,main] 1288727312587515000 remove
Thread[Thread-4,5,main] 1288727312587515000 remove
Thread[Thread-5,5,main] getting
Thread[Thread-4,5,main] getting
Thread[Thread-3,5,main] getting
Thread[Thread-2,5,main] getting
Thread[Thread-6,5,main] 1288727312587515000 remove
Thread[Thread-0,5,main] getting
Thread[Thread-0,5,main] sleeping in function #1
Thread[Thread-8,5,main] getting
Thread[Thread-1,5,main] getting
Thread[Thread-7,5,main] 1288727312587515000 remove
Thread[Thread-5,5,main] sleeping in function #1
Thread[Thread-9,5,main] 1288727312587515000 remove
Thread[Thread-5,5,main] sleeping in function #2
Thread[Thread-7,5,main] getting
Thread[Thread-2,5,main] 1288727312589093000 remove
Thread[Thread-3,5,main] 1288727312589093000 remove
Thread[Thread-4,5,main] 1288727312589093000 remove
Thread[Thread-0,5,main] sleeping in function #2
Thread[Thread-0,5,main] 1288727312588824000 remove
Thread[Thread-1,5,main] 1288727312588824000 remove
Thread[Thread-8,5,main] 1288727312588824000 remove
Thread[Thread-6,5,main] getting
Thread[Thread-6,5,main] sleeping in function #1
Thread[Thread-6,5,main] sleeping in function #2
Thread[Thread-6,5,main] 1288727312589850000 remove
Thread[Thread-8,5,main] getting
Thread[Thread-1,5,main] getting
Thread[Thread-0,5,main] getting
Thread[Thread-3,5,main] getting
Thread[Thread-2,5,main] getting
Thread[Thread-7,5,main] sleeping in function #1
Thread[Thread-5,5,main] 1288727312589093000 remove
Thread[Thread-9,5,main] getting
Thread[Thread-7,5,main] sleeping in function #2
Thread[Thread-8,5,main] sleeping in function #1
Thread[Thread-7,5,main] 1288727312590386000 remove
Thread[Thread-9,5,main] sleeping in function #1
Thread[Thread-8,5,main] sleeping in function #2
Thread[Thread-9,5,main] sleeping in function #2
Thread[Thread-9,5,main] 1288727312590740000 remove
Thread[Thread-8,5,main] 1288727312590627000 remove
Thread[Thread-1,5,main] 1288727312590627000 remove
Thread[Thread-3,5,main] 1288727312590627000 remove
Thread[Thread-0,5,main] 1288727312590627000 remove
Thread[Thread-2,5,main] 1288727312590627000 remove

Original comment by blair-ol...@orcaware.com on 2 Nov 2010 at 7:50

GoogleCodeExporter commented 9 years ago
I think it would be a lot easier to reason about your test if you moved the 
await to be inside the get. What you really are trying to test is that new 
computations aren't started during a pending computation, which you'll find to 
be the case once properly tested.

Original comment by fry@google.com on 5 Nov 2010 at 2:00

GoogleCodeExporter commented 9 years ago
Not a scala person but I agree that there is an issue here.

A java test app to do the same looks like:

  @Test
  public void testAddRemove() throws Exception {
    final ConcurrentMap<Integer, Integer> map;
    map = new MapMaker()
          .makeComputingMap(new Function<Integer, Integer>() {

            @Override
            public Integer apply(Integer from) {
              System.out.println(Thread.currentThread() + ": Loading Key "+from);
              try {
                Thread.sleep(1000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              System.out.println(Thread.currentThread() + ": Completed Loading Key "+from);
              return 1;
            }
          });

    new Thread(){
      public void run() {
        map.get(1);
      }}.start();

    Thread.sleep(50);
    map.remove(1);
    new Thread(){
      public void run() {
        map.get(1);
      }}.start();

    Thread.sleep(2000);
  }

Returns:
 Thread[Thread-0,5,main]: Loading Key 1
 Thread[Thread-1,5,main]: Loading Key 1
 Thread[Thread-0,5,main]: Completed Loading Key 1
 Thread[Thread-1,5,main]: Completed Loading Key 1

This shows 2 threads are loading the same key concurrently. This is because the 
remove() method does not block in the same way as the get() method for 
concurrentMaps. Perhaps all mutating operations on computing maps should block?

Original comment by plh...@googlemail.com on 22 Dec 2010 at 9:46

GoogleCodeExporter commented 9 years ago
This behavior is actually correct.

What this comes down to is a philosophical argument about when an entry being 
computed actually enters the map. As currently defined, at the time that the 
very first get occurs, a new entry is created, which will be observable by both 
size and remove. However the value which that entry maps to is not fully 
specified until computation completes.

If you call remove before computation completes then the existing entry is 
removed, and while all previously concurrent requests for that entry will 
continue to wait for the value, any future request will (correctly) result in 
the creation of a _new_ entry.

I think you will find that both of your test cases would be easier to reason 
about if you used the two parameter remove(key, value) to avoid a thread 
removing an entry it never observed.

Original comment by yrfselrahc@gmail.com on 14 Jan 2011 at 5:19

GoogleCodeExporter commented 9 years ago
This issue has been migrated to GitHub.

It can be found at https://github.com/google/guava/issues/<id>

Original comment by cgdecker@google.com on 1 Nov 2014 at 4:15

GoogleCodeExporter commented 9 years ago

Original comment by cgdecker@google.com on 3 Nov 2014 at 9:09