ReactiveCouchbase / ReactiveCouchbase-core

Core library for ReactiveCouchbase
Apache License 2.0
64 stars 28 forks source link

Couchbase as an Idempotent Repository #58

Open behrad opened 9 years ago

behrad commented 9 years ago

I want to implement an Idempotent Repository on Couchbase with three synchronous add, contains, remove operations, which will be called concurrently in a very high rate (5000 calls/sec each)

my very simple implementation is as below:

//Remove Op
val res = Await.result( bucket.delete( key ), 1 second )

// Contains check
val res = Await.result( bucket.get[String]( key ), 1 second )
res.exists( _ => true )

// Add operation
val res = Await.result( bucket.add( key, System.currentTimeMillis, CouchbaseExpirationTiming_byDuration( expiry ) ), 1 second )
res.isSuccess

however this is not scalable and my operations time out under load. How can I achieve better implementation? can stream methods help me with sync operations needed above?

mathieuancelin commented 9 years ago

Just one question, why do you use Await ?

mathieuancelin commented 9 years ago

Also, I'm not sure to understand what you are trying to achieve. Some kind of replace operation ?

behrad commented 9 years ago

I am implementing Camels IdempotentRepositorys SPI here: https://github.com/apache/camel/blob/master/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java

mathieuancelin commented 9 years ago

Hum, okay I see.

The problem here is the 1 second timeout, you can't rely on that under pressure.

behrad commented 9 years ago

then turning Await to a non blocking async operation wont help then... If i can't rely on many concurrent get/set operations under pressure, what other couchbase operation could feed me here? what about streamset... !?

mathieuancelin commented 9 years ago

As you seems to need blocking operations, either you can wait longer, of you can use the underlying driver and access blocking operations.

chenbekor commented 9 years ago

hi @behrad sorry for jumping late to the conversation ;) I am not familiar with Camel can you please describe in more details what is the exact use case? add, contains, remove operations running concurrently on the same document / key? 5000 calls/sec - is this concurrent updates on the same entity?

from past experience 5000/sec should not be an issue for couchbase. what scala framework are you using to handle 5K/sec load?

behrad commented 9 years ago

@chenbekor that Camel interface is an IdempotentRepository which is to ensure no duplicate message is processed in Camel. Each message arriving gets a UUID and is added to a couchbase bucket via that add method add(uuid) when processed by Camel, we are using akka atLeastOnceDelivery so that messages are retried if no ack received at the end of message processing. however we need a exactly once delivery semantics, so we need idempotency at the message processing so that it skips duplicates messages (contains method). successfully processed messages are removed (remove method) here is how camel is doing this: https://github.com/apache/camel/blob/master/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java

So I can say that, these operations are called 90% on new uuids, we only want to catch duplicate message processing on some exceptional use cases.

You can see a simple in memory LRU implementation of that idempotent cache here: https://github.com/apache/camel/blob/master/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java however this is not distributed and scalable :)

Couchbase would be a nice candidate for this scenario I thought...

chenbekor commented 9 years ago

I'm trying to help so please excuse me for asking you to provide more details :)

Let's try again - but this time, if you can avoid the Camel related logic and also - if you can omit the Idempotent repository term.

Try to describe in very simple words what it is that you are looking from couch. Example:

I want to set a document in one thread and ensure that when I'm trying to access the same doc from a different thread I will get "key already exist"

or maybe:

I want to set from one thread and read concurrently from another thread etc

this will help me allot to understand what you are trying to achieve.

behrad commented 9 years ago

I expect couchbase as a distributed in memory hash table, and I put message ids as keys there. for each message I want to ensure that key doesn't exist so I process it. if it exists I cancel that message :)

Thank you for your attention and time @chenbekor

behrad commented 9 years ago

I think I can move these checks to some other place where there is possible to bulkGet ids to filter exist ones and bulkSet to set new ones. this is more performant. however I don't know how to translate this into reactivecouchbase or scala:

public List<JsonDocument> bulkGet(final Collection<String> ids) {
    return Observable
        .from(ids)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String id) {
                return bucket.async().get(id);
            }
        })
        .toList()
        .toBlocking()
        .single();
}
behrad commented 9 years ago

may be off topic: Can you also provide me a code snip show how to use stream api in reactivecouchbase? could they help me here?

mathieuancelin commented 9 years ago

As you seems to need blocking operations (because of Camel), streams will not help you there.

So I guess your first instinct was right, only the timeout was wrong

object Repo extends Service[String] {

  import org.reactivecouchbase.CouchbaseRWImplicits.jsValueToDocumentWriter
  import org.reactivecouchbase.CouchbaseRWImplicits.documentAsJsValuetReader

  private[this] val maxDuration = Duration(Long.MaxValue, TimeUnit.MILLISECONDS)
  private[this] val driver = ReactiveCouchbaseDriver()
  private[this] val bucket = driver.bucket("default")

  override def add(key: String): Boolean = {
    Await.result(bucket.add[JsNumber](key, JsNumber(System.currentTimeMillis())).map(_.isSuccess), maxDuration)
  }
  override def contains(key: String): Boolean = {
    Await.result(bucket.get[JsNumber](key).map(_.isDefined), maxDuration)
  }
  override def remove(key: String): Boolean = {
    Await.result(bucket.delete(key).map(_.isSuccess), maxDuration)
  }
  override def confirm(key: String): Boolean = ???  // I don't understand the meaning of that method
}

and the Scala way for your previous piece of code is something like

def bulkGet(ids: List[String]): List[JsObject] = {
  import org.reactivecouchbase.CouchbaseRWImplicits.documentAsJsObjectReader
  Await.result(   // block until future is completed
    Future.sequence(  // transform a List[Future[M]] to Future[List[M]]
      ids.map(id => bucket.get[JsObject](id)) // for each id fetch the document in couchbase
        .map { optDocsList =>
          optDocsList.collect {   // as a document may not be present for an id, filter all None options
            case Some(doc) => doc
          }
        }
    ),
    Duration(Long.MaxValue, TimeUnit.MILLISECONDS)    // wait forever
  )
}
chenbekor commented 9 years ago

I think that Mathiieu's code is always a real educational stuff - again Mathieu thank you for all that you are doing here.

@behrad here is my opinion in terms of scalable solution that answers your requirement.

What I would do here is wrap the couch related functionality in a separate micro service. I would expose the service via a REST API (or more performant protocols only if a bottleneck is identified)

Next, I would put a load balancer in front of this micro service. If you are running on AWS I would recommend using ELB and Auto Scaling Group. We see great results with this approach.

Now - your micro service can scale in / out to handle thousands of concurrent requests.

To be more pragmatic I would recommend play framework which has great integration with reactive couchbase thanks to Mathieu. This will allow you to utilize the async API and the non-blocking nature of Play while retaining full synchronous operation from a REST API standpoint.

So internally - the service is async/non blocking but externally you provide a synchronous API which is exactly what you need for your use case. You can now call the REST API from your other parts (I guess the Camel plugin) and block on the micro service callback.

needless to say - play framework can be replaced by spray or finagle or what have you.

Couchbase is rarely a bottleneck from our experience.

behrad commented 9 years ago

Thank you both guys, and we are using Spray in our REST server micro service, then using akka clustering to connect that to our other micro services one of which is trying to ensure exactly once message processing/delivery. I think I should move this logic from that Camel stuff (which is synchronous) to another service where we can use async bulk get/set, (for which I requested that snippet) this way I can use what @mathieuancelin wrote for me, or even stream get /set, huh? I just couldn't find any code sample showing usage of those (I'm not that familiar with plays Iteratee API)

mathieuancelin commented 9 years ago

Can you explain me exactly what you need to do with the stream API and I'll try to write something :-)

behrad commented 9 years ago

Can I use that to do bulkGet, bulkSet? something like the code snip you wrote before :)

There is no documentation on those methods :)

mathieuancelin commented 9 years ago

A little example

implicit val personReader: Reads[Person] = ???
implicit val personWriter: Writes[Person] = ???

bucket.fetchValues[JsObject](ids).enumerate.apply(Iteratee.foreach { doc =>
  println(Json.prettyPrint(doc))  // for each doc in the stream, print it
})

val enumerator = bucket.fetchValues[JsObject](ids).enumerate  // create a stream of keys
enumerator.through(Enumeratee.map { doc =>  // and stream docs from couchbase
  personReader.reads(doc)  // for each doc in the stream, parse it
}).through(Enumeratee.collect[JsResult[Person]] {
  case JsSuccess(person, _) => person // for each parse result, filter only success
}).apply(Iteratee.foreach { person =>
  println(person) // for each resulting person, print it
})

// same example with method aliases
val docEnumerator = bucket.fetchValues[JsObject](ids).enumerate
val personEnumerator = docEnumerator &> Enumeratee.map { doc =>
  personReader.reads(doc)
} &> Enumeratee.collect[JsResult[Person]] {
  case JsSuccess(person, _) => person
}
personEnumerator(Iteratee.foreach { person =>
  println(person)
})

// bulk set
val persons = Seq(Person(UUID.randomUUID().toString, "John", "Doe", 42), Person(UUID.randomUUID().toString, "Jane", "Doe", 42))
val personsEnumerator = Enumerator.enumerate(persons).map(p => (p._id, p))  // create a stream of data
bucket.setStream(personsEnumerator).map {      // and stream it to couchbase
  case list => list.foreach(op => println(op.isSuccess))
}
behrad commented 9 years ago

@mathieuancelin thank you for these educational nice examples

behrad commented 9 years ago

@mathieuancelin @chenbekor Can you please tell me how exactly this couchbase sample can be turned to reactiveCouchBase API language?

Observable
    .defer(() -> bucket.get("id"))
    .map(document -> {
        document.content().put("modified", new Date().getTime());
        return document;
    })
    .flatMap(bucket::replace)
    .retryWhen(attempts ->
        attempts.flatMap(n -> {
            if (!(n.getThrowable() instanceof CASMismatchException)) {
                return Observable.error(n.getThrowable());
            }
            return Observable.timer(1, TimeUnit.SECONDS);
        })
    )
    .subscribe();

I wanna do a get, replace, and retry until it succeeds. Using updateAtomic under pressure complains BOUT LOCK_ERROR and my app crashes

chenbekor commented 9 years ago

well - you can review a similar open issue that I opened here: https://github.com/ReactiveCouchbase/ReactiveCouchbase-play/issues/40

if the doc exist and you only need to take care of updates you will find a good example in the above link. if you also need to take care of first time insertion currently I don't think there is a way to handle this in reactivecouch as you can read in the same link. I hope @mathieuancelin will get a chance to look at it sometime. Since I needed both update and first time creation handling under high contention I used the native API via the CASMutator

  private def mergeEvent(event_id: String, event: JsObject) = {

    //since we are expecting multiple updates on the same document
    //we will use CASMutator in order to handle correctly retries on contention
    val repair = new CASMutation[JsObject]() {
      //CAS Mutator will call getNewValue with existing doc
      //this method should return the updated doc
      def getNewValue(existing: JsObject): JsObject = {

existing.deepMerge(event)

      }
    }

    // async
    val mutator = new CASMutator[JsObject](bucket.couchbaseClient, jsonTranscoder)
    mutator.cas(event_id, event, 60 * 60 * 24 * 1, repair)

  }

where jsonTranscoder is some variable : val jsonTranscoder = new JsonTranscoder

and JsonTranscoder code is:

class JsonTranscoder extends Transcoder[JsObject] with SpyTranscoder {
  def encode(item: JsObject): CachedData =
    cachedData(item.toString)
  def decode(data: CachedData): JsObject =
    Json.parse(cachedString(data)).as[JsObject]
}
/**
 * Basic Spy Memcached transcoder with string transcoding
 */
trait SpyTranscoder {
  def asyncDecode(data: CachedData) = false
  def getMaxSize() = CachedData.MAX_SIZE
  val charset = "UTF-8"

  def cachedData(content: String): CachedData =
    new CachedData(0, content.getBytes(charset), getMaxSize())
  def cachedString(data: CachedData): String =
    new String(data.getData(), charset)
}

@behrad if you do get to use the above example code - I would appreciate a code review and suggestions on how to improve ;)

final question - what API are you using in the above example that you gave? is that the rxJava thing of the new java API? are you aware that reactivecouchbase is using the old generation of the java sdk?

behrad commented 9 years ago

exciting answer @chenbekor So you are talking about using the memcached mutator API for this!! not the Couchbase's !?

what API are you using in the above example that you gave?

Actually picked it up from the new Java SDK 2.1 documentation here http://docs.couchbase.com/developer/java-2.1/documents-updating.html

are you aware that reactivecouchbase is using the old generation of the java sdk?

No! and thats BAD news, I hope reactivecouchbase can refactor to new Java SDK which sounds more performant. I can also help if @mathieuancelin hasn't enough time for that...

and I'll try to drop your code in, in a few days, should I copy/paste JsonTranscoder as is!? is'nt there any defaults available?

mathieuancelin commented 9 years ago

Hi,

I think something like that should be good

val driver = ReactiveCouchbaseDriver()
val bucket = driver.bucket("default")
val scheduler = driver.scheduler()

bucket.get[JsObject]("id").collect {
  case Some(document) => document   // because reactivecouchbase handle nicely document not found as option
}.map { document =>
  document ++ Json.obj("modified" -> System.currentTimeMillis())
}.flatMap { document =>
  val result = Promise[OpResult]()
  def replace(delay: FiniteDuration): Unit = scheduler.scheduleOnce(delay) {
    bucket.replace("id", document).onComplete {
      case Success(opResult) => {
        if (opResult.isSuccess)
          result.trySuccess(opResult)
        else
          replace(1000 milli)
      }
      case Failure(error) => error match {
        case _: WhateverExceptionHere => replace(1000 milli)
        case e => result.tryFailure(e)
      }
    }
  }
  replace(0 milli)
  result.future
}
mathieuancelin commented 9 years ago

Also, I'm not sure replace throws Exceptions when failing, so you can consider something simpler like :

val driver = ReactiveCouchbaseDriver()
val bucket = driver.bucket("default")
val scheduler = driver.scheduler()

bucket.get[JsObject]("id").collect {
  case Some(document) => document   // because reactivecouchbase handle nicely document not found as option
}.map { document =>
  document ++ Json.obj("modified" -> System.currentTimeMillis())
}.flatMap { document =>
  val result = Promise[OpResult]()
  def replace(delay: FiniteDuration): Unit = scheduler.scheduleOnce(delay) {
    bucket.replace("id", document).onComplete {
      case Success(opResult) if opResult.isSuccess => result.trySuccess(opResult)
      case Success(opResult) if opResult.isFailure => replace(1000 milli)
      case Failure(error) => result.tryFailure(error)
    }
  }
  replace(0 milli)
  result.future
}
behrad commented 9 years ago

thank you @mathieuancelin will try this before the CASmutator and let you know the results

mathieuancelin commented 9 years ago

Ok, I'm curious to know if it's enough for your use case

chenbekor commented 9 years ago

@mathieuancelin can you please explain what you code does? what is this scheduler thing? how many retries it does etc? seems to me that the casmutator is more robust in terms of high contention retries.

again thank u for helping us!

mathieuancelin commented 9 years ago

See the comments inline

val driver = ReactiveCouchbaseDriver()
val bucket = driver.bucket("default")
val scheduler = driver.scheduler() // Akka scheduler to handle retries every seconds
val maxRetry = 10 // max number of retries

bucket.get[JsObject]("id").collect { // fetch document for key "id" and map+filter if doc was found
  case Some(document) => document   // because reactivecouchbase handle nicely document not found as option
}.map { document =>  // map the resulting document
  document ++ Json.obj("modified" -> System.currentTimeMillis())  // change value of field modified
}.flatMap { document =>  // flatMap to handle the replaceWithRetry as a future
  val result = Promise[OpResult]() // the result of the replaceWithRetry
  def replace(delay: FiniteDuration, count: Int): Unit = scheduler.scheduleOnce(delay) {  // here the replace function scheduled with the defined delay
    bucket.replace("id", document).onComplete {     // perform a replace in the couchbase bucket
      case Success(opResult) if opResult.isSuccess => result.trySuccess(opResult)  // if no exception and successful, then complete the promise
      case Success(opResult) if opResult.isFailure && count < maxRetry => replace(1000 milli, count + 1) // if no exception and not successful, then retry in 1 second
      case Success(opResult) if opResult.isFailure && count >= maxRetry => result.tryFailure(new RuntimeException("Too many retry ..."))  // if no exception and not successful and no more retries, then fail the promise
      case Failure(error) => result.tryFailure(error) // if exception, then fail the promise
    }
  }
  replace(0 milli, 0) // try to replace now
  result.future  // return the result as a future
}