akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 647 forks source link

CSV: how to skip bad rows, supervisionStrategy not working #1352

Open slmzig opened 5 years ago

slmzig commented 5 years ago

I`m using Alpakka for parsing csv files. version "com.lightbend.akka" %% "akka-stream-alpakka-csv" % 0.20 I have csv file with unclosed quote.

email
test@emample.com
"test@emample.com
test@emample.com
test@emample.com

I want to skip bad rows and go next, but my stream is falling.

I`m using supervisionStrategy Supervision.Resume, but it is not working.

The stream fail when find unclosed quote.

Is there any way to fix that?

my code:


implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
  Source
    .single(csv)
    .map(ByteString.apply)
    .mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))

val csv = """email,country,name
            |"test,test,test
            |test,test,test
            |test,test,test
            |""".stripMargin

val source = hdfsSource(csv)

val decider: Supervision.Decider = {
  case _ ⇒ Supervision.Resume
}

val result = source
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runForeach(println)
ennru commented 5 years ago

Please ask questions on the forum.

Cross-post of https://stackoverflow.com/questions/53543295/akka-stream-alpakka-csv-skip-exception-and-parse-next-rows

Currently CsvParsing.lineScanner() does not support supervision strategies. You can choose another symbol as a quote character for the line scanner CsvParsing.lineScanner(quoteChar = '\''). Then you will get the unclosed double quote as part of parsed results:

Map(email -> "test, country -> test, name -> test)
Map(email -> test, country -> test, name -> test)
Map(email -> test, country -> test, name -> test)
ennru commented 5 years ago

Supporting a supervision strategy for the CSV parser is non-trivial. It would need to understand how to get back to a stable state eg. a line end, just ignoring part of the input will not be so useful.