akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 645 forks source link

Alpakka-file | Decompressing tar file with mapAsync with more than 1 parallelism set or with map is not working #2634

Open gokul-lb opened 3 years ago

gokul-lb commented 3 years ago

Versions used

Akka version: Alpakka-file - 2.0.2 and current snapshot version

Expected Behavior

When streaming tar file via Compression.gunzip() and Archive.tarReader() and in the final stage using either mapAsync( > 1) or map should untar files part of the original tar file.

Actual Behavior

With snapshot version of alpakka file -> things are working as expected if we set mapAsync(1) {...} (with parallelism to 1) but not working mapAsync(>1) {...} (with parallelism greater than 1) and just with map {...}

Note: Nothing works with Alpakka-file 2.0.2, hope it is know defect.

Relevant logs

Unable to attach tar file, I can share if required in a different medium (internal slack).

Reproducible Test Case

val extractContents: Flow[Path, ByteString, NotUsed] = Flow[Path].flatMapConcat(path => { FileIO.fromPath(path) }) val untar = Flow[ByteString] .via(Compression.gunzip()) .via(Archive.tarReader()) .mapAsync(3) { case (metadata, source) => // business logic } }

ennru commented 3 years ago

Tar files must be read sequentially, I don't think this can be changed to work as you would like.