knutwalker / akka-stream-json

Apache License 2.0
28 stars 14 forks source link

Can you provide an example using the library with Akka-Stream? #1

Closed wpoosanguansit closed 8 years ago

wpoosanguansit commented 8 years ago

Hi,

I came across the library and I would like to try to parse some json stream that I will be reading from a file. I am not really sure how to use the library yet.

When I tried to parse the stream using Circe, I do get

could not find implicit value for evidence parameter of type io.circe.Decoder[jawn.ast.JValue]

I have looked at Circe source and found the support for spray-json file. So I include them as below to resolve the implicit issue.

import jawn.ast.JValue import jawn.support.spray.Parser. import spray.json.{ DefaultJsonProtocol, JsValue } import io.circe. import io.circe.generic.auto. import io.circe.parser. import io.circe.syntax._

But when I ran the the flow:

FileIO.fromFile(someFile)
      .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 512, allowTruncation = true))
      //.via(de.knutwalker.akka.stream.support.CirceStreamSupport.decode[JsValue])
      .map(s => { println(s); s.toString })
      .runWith(Sink.foreach(println))

The first via prints out the content while the second just does not process anything.

Would it be possible to have some sample code for such a scenario? Thanks for your help.

knutwalker commented 8 years ago

Hi,

there are a couple of different concepts at play here. First, the library uses jawn for parsing. Jawn uses a Facade to decide what it parses into. The result is usually an AST from some Json library of your choosing. It depends on this library how to progress further.

For example, to parse into spray-json, you have to include the "de.knutwalker" %% "akka-stream-json" % "3.0.0" and the `"org.spire-math" %% "jawn-spray" % "0.8.4" libraries and then the following should work

import de.knutwalker.akka.stream._
import jawn.support.spray.Parser._ // this is the spray facade
import spray.json._                // this is the spray ast

FileIO.fromFile(someFile)
  .via(JsonStreamParser[JsValue])
  .runWith(Sink.foreach(println))

What you do from here is entirely up to spray-json.

The circe module from this library is just a shortcut for this and the additional step to get to your case class, but it is in no way required to use circe. Overall, much of the usage depends on what json backend you end up using and the only actual thing provided by this library is the JsonStreamParser.

Hope this helps a bit.

wpoosanguansit commented 8 years ago

Thank you, Paul. This really helps. It has got me started for now.

On Jun 10, 2016, at 3:04 PM, Paul Horn notifications@github.com wrote:

import de.knutwalker.akka.stream. import jawn.support.spray.Parser. // this is the spray facade import spray.json._ // this is the spray ast

FileIO.fromFile(someFile) .via(JsonStreamParser[JsValue]) .runWith(Sink.foreach(println))

wpoosanguansit commented 8 years ago

I hope you don't mind I ask a related question. I do run the flow reading from a file. However, it looks like I can only run it once and it becomes exhausted. How do I continually process the files that will be coming in aperiodically? Thanks again for your help.

Here is the code that I tried to integrate this with Scala file watcher:

    val fileMonitorActor = system.actorOf(MonitorActor(concurrency = 2))

    val action: Callback = { path =>
      println(s"Something was modified in a directory: ${path.toRealPath()}" +
        s" ${path.toAbsolutePath} ${path.getFileName()}")
      val filePath = path.toAbsolutePath
      FileIO.fromFile(new File(filePath.toString))
        .via(JsonStreamParser[JsValue])
        .runWith(Sink.foreach(println))

      Files.deleteIfExists(filePath)
    }

    val directory = Paths get "src/main/resources/"

    fileMonitorActor ! RegisterCallback(
      event = ENTRY_MODIFY,
      path = directory,
      callback = action)
  }

The json parse runs only once.

knutwalker commented 8 years ago

The default JsonStreamParser[JsValue] should parse multiple json documents that appear in the file, though it really depends on the async parser mode from jawn, ValueStream being the default. Do you want to continue parsing after you've added more files or modified the existing file outside of the application? If so, this is out of scope for either akka-stream-json or akka-stream in general. You'd have to implement these features by yourself, the FileIO is reading the file as it is at that moment and does not react to changes.

wpoosanguansit commented 8 years ago

Thanks Paul. I do have the watcher that is listening for new files. The problem I have at the moment is the processing seems to be done only once. When I have new file coming in and I call FileIO.fromFile again, that just does not do kick off the parsing process again. Could you provide more detail on what needs to be done to keep the parsing going for the new files that would be dropped into the folder? Also, do you have an example where we can use the parser with Flow[Path] and we read the ByteBuffer from the path to feed the parser? Thanks again for your help.

On Jun 11, 2016, at 6:24 AM, Paul Horn notifications@github.com wrote:

The default JsonStreamParser[JsValue] should parse multiple json documents that appear in the file, though it really depends on the async parser mode from jawn https://github.com/non/jawn#parsing, ValueStream being the default. Do you want to continue parsing after you've added more files or modified the existing file outside of the application? If so, this is out of scope for either akka-stream-json or akka-stream in general. You'd have to implement these features by yourself, the FileIO is reading the file as it is at that moment and does not react to changes.

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/knutwalker/akka-stream-json/issues/1#issuecomment-225361524, or mute the thread https://github.com/notifications/unsubscribe/AAFibUPiMmkLhJxuY0oK-U7t2VKsPhNhks5qKrcrgaJpZM4IzQfl.

knutwalker commented 8 years ago

It might be related to the call to deleteIfExists call. Since the flow run asynchronously, it is likely that the file is removed before the flow has finished, maybe even before it has started. Maybe something like this could work:

val fileParser = Source.actorRef[Path](0, OverflowStrategy.dropNew) // use proper settings for your application
  .flatMapConcat(p => FileIO.fromFile(p.toFile))
  .via(JsonStreamParser[JsValue])
  .toMat(Sink.foreach(println))(Keep.left)
  .run()

val action: Callback = { path =>
  println(s"Something was modified in a directory: ${path.toRealPath()}" +
    s" ${path.toAbsolutePath} ${path.getFileName()}")
  fileParser ! path
}
wpoosanguansit commented 8 years ago

Thanks for your help. I will try that. However, the file has to be deleted or moved out of the directory. Could that be part of the flow?

On Jun 11, 2016, at 8:08 AM, Paul Horn notifications@github.com wrote:

It might be related to the call to deleteIfExists call. Since the flow run asynchronously, it is likely that the file is removed before the flow has finished, maybe even before it has started. Maybe something like this could work:

val fileParser = Source.actorRef[Path](0, OverflowStrategy.dropNew) // use proper settings for your application .flatMapConcat(p => FileIO.fromFile(p.toFile)) .via(JsonStreamParser[JsValue]) .toMat(Sink.foreach(println))(Keep.left) .run()

val action: Callback = { path => println(s"Something was modified in a directory: ${path.toRealPath()}" + s" ${path.toAbsolutePath} ${path.getFileName()}") fileParser ! path } — You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/knutwalker/akka-stream-json/issues/1#issuecomment-225368503, or mute the thread https://github.com/notifications/unsubscribe/AAFibWMKY88o64HkiMgyYEEVTHtvIWMJks5qKs94gaJpZM4IzQfl.

knutwalker commented 8 years ago

I don't think you could add this to the final flow as you will not have any notion about which json belonged to which file, but maybe something like this will work:

val fileParser = Source.actorRef[Path](0, OverflowStrategy.dropNew) // use proper settings for your application
  .mapAsync(1) { filePath =>
  val parsingComplete = FileIO.fromFile(filePath.toFile)
    .via(JsonStreamParser[Json])
    .runWith(Sink.foreach(println))
  parsingComplete.map { _ =>
    Files.deleteIfExists(filePath)
  }
}.to(Sink.ignore).run()
wpoosanguansit commented 8 years ago

I took out the delete line because the actor would not see the file as you mentioned that it is async. However the behavior is still the same. It only process only once. And I do see the println for the second file but there was no parsing going on.

[info] Something was modified in a directory: /scala/reactive-kafka-scala/src/main/resources/.DS_Store /scala/reactive-kafka-scala/src/main/resources/.DS_Store .DS_Store [info] Something was modified in a directory: /scala/reactive-kafka-scala/src/main/resources/guests copy.json /scala/reactive-kafka-scala/src/main/resources/guests copy.json guests copy.json

Right now it works if I just parse the file as below:

// val f = scala.io.Source.fromFile(filePath.toString) // val myResult: Try[JsValue] = Parser.parseFromByteBuffer(ByteBuffer.wrap(f.toArray.map(_.toByte))) // myResult match { // case Success(jsValue) => println(jsValue.asJsObject.fields("count")) // case Failure(t) => println(t) // } Anything stands out as to why the flow is not being called the second time around?

On Jun 11, 2016, at 8:08 AM, Paul Horn notifications@github.com wrote:

val fileParser = Source.actorRef[Path](0, OverflowStrategy.dropNew) // use proper settings for your application .flatMapConcat(p => FileIO.fromFile(p.toFile)) .via(JsonStreamParser[JsValue]) .toMat(Sink.foreach(println))(Keep.left) .run()

val action: Callback = { path => println(s"Something was modified in a directory: ${path.toRealPath()}" + s" ${path.toAbsolutePath} ${path.getFileName()}") fileParser ! path }

knutwalker commented 8 years ago

Do you send the .DS_Store file into the flow as well? This is probably not a valid json and if the stream parser encounters an error, it will fail the stream. So, the first file runs throught the flow and the .DS_Store file fails it so that it stops doing anything

wpoosanguansit commented 8 years ago

Oh, let me test that out.

On Jun 11, 2016, at 8:31 AM, Paul Horn notifications@github.com wrote:

Do you send the .DS_Store file into the flow as well? This is probably not a valid json and if the stream parser encounters an error, it will fail the stream. So, the first file runs throught the flow and the .DS_Store file fails it so that it stops doing anything

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/knutwalker/akka-stream-json/issues/1#issuecomment-225370126, or mute the thread https://github.com/notifications/unsubscribe/AAFibRLa2tm8CYRIJ3aad_lwLglB0Dnwks5qKtTagaJpZM4IzQfl.

harsh86 commented 7 years ago

Hi,

I am trying to you use your library with Akka-stream java and also using Jackson based json. I am not very familiar with scala. Can you help me with an example to use the framework in akka-stream java world. I am more confused aorung the facade for jackson.

FileIO.fromFile(file) .via(JsonStreamParser.flow(??)) .runWith(Sink.foreach(println))

knutwalker commented 7 years ago

@harsh86 This library only works together with Jawn, not with Jackson. Jawn doesn't target Java, so this library doesn't as well. If you wan to use Jackson together with Akkas Java API, have a look at Source Streaming and akka-http-json.