OpenHFT / Chronicle-Map

Replicate your Key Value Store across your network, with consistency, persistance and performance.
http://chronicle.software/products/chronicle-map/
Apache License 2.0
2.78k stars 470 forks source link

Boxed/atomic values in Scala #76

Closed keynmol closed 8 years ago

keynmol commented 8 years ago

Hello.

We're using ChronicleMap to support off-heap persistence in a large number of different stores, but hit a bit a of a problem with the most simple usecase.

First of all, here's the helper I wrote to make creation easier:

import java.io.File
import java.util.concurrent.atomic.AtomicLong

import com.madhukaraphatak.sizeof.SizeEstimator
import net.openhft.chronicle.map.{ChronicleMap, ChronicleMapBuilder}

import scala.reflect.ClassTag

object ChronicleHelper {

  def estimateSizes[Key, Value](data: Iterator[(Key, Value)], keyEstimator: AnyRef => Long = defaultEstimator, valueEstimator: AnyRef => Long = defaultEstimator): (Long, Long, Long) = {
    println("Estimating sizes...")

    val entries = new AtomicLong(1)
    val keySum = new AtomicLong(1)
    val valueSum = new AtomicLong(1)
    var i = 0

    val GroupSize = 5000

    data.grouped(GroupSize).foreach { chunk =>

      chunk.par.foreach { case (key, value) =>
        entries.incrementAndGet()
        keySum.addAndGet(keyEstimator(key.asInstanceOf[AnyRef]))
        valueSum.addAndGet(valueEstimator(value.asInstanceOf[AnyRef]))
      }

      i += 1

      println("Progress:" + i * GroupSize)
    }

    (entries.get(), keySum.get() / entries.get(), valueSum.get() / entries.get())
  }

  def defaultEstimator(v: AnyRef): Long = SizeEstimator.estimate(v)

  def createMap[Key: ClassTag, Value: ClassTag](data: => Iterator[(Key, Value)], file: File): ChronicleMap[Key, Value] = {
    val keyClass = implicitly[ClassTag[Key]].runtimeClass.asInstanceOf[Class[Key]]
    val valueClass = implicitly[ClassTag[Value]].runtimeClass.asInstanceOf[Class[Value]]

    val (entries, averageKeySize, averageValueSize) = estimateSizes(data)

    val builder = ChronicleMapBuilder.of(keyClass, valueClass)
      .entries(entries)
      .averageKeySize(averageKeySize)
      .averageValueSize(averageValueSize)
      .asInstanceOf[ChronicleMapBuilder[Key, Value]]

    val cmap = builder.createPersistedTo(file)

    val GroupSize = 5000

    println("Inserting data...")
    var i = 0
    data.grouped(GroupSize).foreach { chunk =>

      chunk.par.foreach { case (key, value) =>
        cmap.put(key, value)
      }

      i += 1

      println("Progress:" + i * GroupSize)
    }

    cmap
  }

  def empty[Key: ClassTag, Value: ClassTag]: ChronicleMap[Key, Value] = {
    val keyClass = implicitly[ClassTag[Key]].runtimeClass.asInstanceOf[Class[Key]]
    val valueClass = implicitly[ClassTag[Value]].runtimeClass.asInstanceOf[Class[Value]]

    ChronicleMapBuilder.of(keyClass, valueClass).create()
  }

  def loadMap[Key: ClassTag, Value: ClassTag](file: File): ChronicleMap[Key, Value] = {
    val keyClass = implicitly[ClassTag[Key]].runtimeClass.asInstanceOf[Class[Key]]
    val valueClass = implicitly[ClassTag[Value]].runtimeClass.asInstanceOf[Class[Value]]

    ChronicleMapBuilder.of(keyClass, valueClass).createPersistedTo(file)
  }
}

It uses https://github.com/phatak-dev/java-sizeof for object size estimation. Here's the kind of usage we want to support:

object TestChronicle {
  def main(args: Array[String]) {
    def dataIterator: Iterator[(String, Int)] = (1 to 5000).toIterator.zipWithIndex.map(x => x.copy(_1 = x._1.toString))

    ChronicleHelper.createMap[String, Int](dataIterator, new File("/tmp/test.map"))

  }
}

But it throws an exception:

[error] Exception in thread "main" java.lang.ClassCastException: Key must be a int but was a class java.lang.Integer
[error]     at net.openhft.chronicle.hash.impl.VanillaChronicleHash.checkKey(VanillaChronicleHash.java:661)
[error]     at net.openhft.chronicle.map.VanillaChronicleMap.queryContext(VanillaChronicleMap.java:281)
[error]     at net.openhft.chronicle.map.VanillaChronicleMap.put(VanillaChronicleMap.java:390)
...

I can see that it might have something to do with atomicity of Scala's Int as opposed to Java's Integer, but how do I bypass that?

keynmol commented 8 years ago

Forgot to mention, that I'm experimenting with 3.8.0:

libraryDependencies += "net.openhft" % "chronicle-map" % "3.8.0"
leventov commented 8 years ago

Another recommendation: