paypal / squbs

Akka Streams & Akka HTTP for Large-Scale Production Deployments
http://paypal.github.io/squbs
Apache License 2.0
1.43k stars 237 forks source link

Scala 2.13 Not able to work with custom classes #744

Closed TRReeve closed 3 years ago

TRReeve commented 3 years ago

Hi there. Am just trying to Run squbs with a custom case class. I'm not able to get it to behave in the expected manner of I run the stream, push my events to the buffer. And then read back out of the buffer.

I've tried a few different versions of the following code some using the object and some not using it some using the commit stages and some not. From what I've seen it seems like something is getting written but I can't seem to read out of the other side even adding in the async partition. I've been able to reproduce it on both a windows computer and a Mac computer so I'm not sure it's anything there.

Scala version 2.13.3 Akka Stream version 2.6.13


package open.hft.exp

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import net.openhft.chronicle.wire.{WireIn, WireOut}
import org.squbs.pattern.stream.{PersistentBufferAtLeastOnce, QueueSerializer}

import java.io.File
import java.nio.file.Paths
import scala.util.{Failure, Success}

case class MyCustomClass(name: String, id: Int)

class CustomSerializer extends QueueSerializer[MyCustomClass] {

  override def writeElement(element: MyCustomClass, wire: WireOut): Unit = {
    wire.writeText(element.name)
    wire.write().int32(element.id)
  }

  override def readElement(wire: WireIn): Option[MyCustomClass] =
    for {
      name <- Option(wire.readText())
      id <- Option(wire.read().int32())
    } yield MyCustomClass(name, id)

}
object App {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("testsystem")
    implicit val mat = Materializer.matFromSystem(system)
    implicit val ec = system.dispatcher
    val tmpDir = Paths.get(System.getProperty("java.io.tmpdir"))
    val name = Paths.get(tmpDir.toString, "cqexperiment")
    println(tmpDir)
    println(name)

    val data = Range(0, 100).map(MyCustomClass("hello", _))

    implicit val serializer: QueueSerializer[MyCustomClass] = new CustomSerializer

    val source = Source(data)
    val buffer = new PersistentBufferAtLeastOnce[MyCustomClass](new File(name.toUri))
    val commit = buffer.commit[MyCustomClass]

    val runstream = source
      .via(buffer.async)
      .via(commit)
      .map(record => println(record.entry))
      .runWith(Sink.last)

    runstream.onComplete {
      case Failure(exception) => println(exception)
    case Success(value) => println(value)
    }

  }
}
TRReeve commented 3 years ago

Problem was solved in the end. Combination of wrong folder in this test code and a serialisation failure to do with using Map[String, String] in the actual code.