ReactiveCouchbase / ReactiveCouchbase-core

Core library for ReactiveCouchbase
Apache License 2.0
64 stars 28 forks source link

Atomic.atomicallyUpdate throws ClassCastException when the key cannot be found #17

Closed athasach closed 10 years ago

athasach commented 10 years ago

For cases where "myKey" cannot be found, the future returned from atomicallyUpdate returns a failed future with

ClassCastException: scala.concurrent.impl.Promise$KeptPromise cannot be cast to MyData

It would be useful if the failed future indicated the not found status instead. I'm thinking of cases where I'm providing a RESTful service and want to return a 404 Not Found status. Since a ClassCastException could be thrown for other reasons, it's difficult to discern when it's truly a casting issue or a not found issue.

bucket.atomicallyUpdate[MyData]("myKey") { currentValue =>
  foo(currentValue)
}

Thanks.

mathieuancelin commented 10 years ago

Hi,

I think it's a bug, the atomicallyUpdate should only return a failed Future without the ClassCastException.

Can you provide me the full stacktrace or some code to reproduce it so I can investigate ?

athasach commented 10 years ago

Thanks for the quick reply. Sure. Here is the stack trace from the example I'm about to post.

23:45:00.785 [ReactiveCouchbaseSystem-akka.actor.default-dispatcher-3] ERROR ReactiveCouchbase - NOT_FOUND for key non-existent-key
java.lang.ClassCastException: scala.concurrent.impl.Promise$KeptPromise cannot be cast to Person
    at Application$$anonfun$3.apply(Application.scala:45)
    at Application$$anonfun$3.apply(Application.scala:44)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
athasach commented 10 years ago
import org.reactivecouchbase.ReactiveCouchbaseDriver
import play.api.libs.functional.syntax._
import play.api.libs.json._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}

case class Person(name: String, age: Int)

object Application extends App {

  implicit val personFmt = Json.format[Person]

  val NonExistentKey = "non-existent-key"

  val driver = ReactiveCouchbaseDriver()
  val bucket = driver.bucket("default")

  val fPerson: Future[Person] =
    bucket.atomicallyUpdate[Person](NonExistentKey) {
      person =>
        val agedPerson = Person(person.name, person.age + 1)
        bucket.replace(NonExistentKey, agedPerson).flatMap { os =>
          if (os.isSuccess()) Future.successful(agedPerson) else Future.failed(new RuntimeException())
        }
    }

  fPerson.onComplete {
    case Success(person) => println(s"Updated ${person.name}")
    case Failure(throwable) => println(s"Updated failed: ${throwable.getMessage()}")
  }

//  driver.shutdown()
}
mathieuancelin commented 10 years ago

Hi

It should be fixed with last snapshot. Can you try it ?

athasach commented 10 years ago

Thanks for the quick turnaround on this!

It no longer throws ClassCastException. However, it's still difficult to figure out why the call failed, since it returns a failed future with java.util.concurrent.ExecutionException whose cause is org.reactivecouchbase.client.AtomicNoKeyFoundError, which is a private class. Because it's private, I can't do an isInstanceOf[AtomicNoKeyFoundError] to return my 404 Not Found error. When I look at the other possible Throwable classes in org.reactivecouchbase.client I see there are other reasons why the call could fail, so always assuming that the item wasn't found wouldn't be correct.

In any case, this gets me further ahead. Thanks for maintaining this project.

mathieuancelin commented 10 years ago

Hi,

you can update to the last snapshot, the atomic errors are now public. Sorry about that.

Given your previous snippet, you actually don't need to perform the replace operation inside the atomic block. Actually, the atomicallyUpdate function will do that for you with the value return from the atomic block :

val fPerson: Future[Person] =
    bucket.atomicallyUpdate[Person](NonExistentKey) { person =>
        Person(person.name, person.age + 1)
    }
athasach commented 10 years ago

Thanks. It's working now. I have a separate question about doing an atomic update such that the replaced document has an updated CouchbaseExpirationTiming, which I will open up a separate issue for.

mathieuancelin commented 10 years ago

Ok, cool