akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 645 forks source link

CsvFormatting.format() crashes #2717

Open MattiL opened 3 years ago

MattiL commented 3 years ago

Versions used

Alpakka version 3.0.2 Akka version: 2.12-2.4.20

Expected Behavior

Alpakka formats CSV or issues an error.

Actual Behavior

Alpakka silently crashes.

Relevant logs

2021-08-11 11:24:59.973 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:367] received command: export terminals 2021-08-11 11:25:00.048 DEBUG [Thread-18] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:407] Success - exporting keys ByteString(-17, -69, -65) 2021-08-11 11:25:00.054 DEBUG [Thread-18] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:409] Success - exported keys 2021-08-11 11:25:00.072 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:412] Found collection 2021-08-11 11:25:00.073 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:417] Mapped to values 2021-08-11 11:25:00.073 DEBUG [LocateClientServerActorSystem-akka.actor.default-dispatcher-6] c.p.r.a.m.RadioAuditDbActor [RadioAuditDbActor.java:422] Mapped to String List 2021-08-11 11:25:02.224 DEBUG [SpringContextShutdownHook] c.p.r.s.ClientConnectionsService [ClientConnectionsService.java:85] removed client connection: com.portalify.radioaudit.auth.UserAuthentication@e8496900: Principal: admin; Credentials: [PROTECTED]; Authenticated: true; Details: [method, GET, request, https://10.0.112.139:8498/websocket, remote-address, 10.0.112.223]; Not granted any authorities StandardWebSocketSession[id=d4dbaf64-4026-c138-926f-51cde78a1d16, uri=wss://10.0.112.139:8498/websocket?token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyODc1NjY5NSwiaWF0IjoxNjI4NjcwMjk1fQ.sG3b1Ci3gPeOyp9aEk1jNpFVqAMs6JO3RQVfelnK3L_6jC4zqCw6jBgPp0hZ0Hv6UwSoa4hcJSrdVE-0LJ20kQ] 2021-08-11 11:25:02.224 DEBUG [SpringContextShutdownHook] c.p.r.s.ClientConnectionsService [ClientConnectionsService.java:86] total connections (remove): 0, for token: 0, tokens: 0 2021-08-11 11:25:02.226 INFO [LocateClientServerActorSystem-akka.actor.default-dispatcher-3] c.p.r.a.WebsocketSubscriptionActor [WebsocketSubscriptionActor.java:52] protocol=websocket event=unsubscribe remote_address=/10.0.112.223:62096 id=d4dbaf64-4026-c138-926f-51cde78a1d16 2021-08-11 11:25:02.226 INFO [LocateClientServerActorSystem-akka.actor.default-dispatcher-3] c.p.r.a.WebsocketSubscriptionActor [WebsocketSubscriptionActor.java:83] protocol=websocket event=broadcast 2021-08-11 11:25:02.240 WARN [https-jsse-nio-8498-exec-2] c.p.r.c.JSONExceptionHandler [JSONExceptionHandler.java:43] Unhandled exception java.lang.InterruptedException: null

Reproducible Test Case

    final Source<Document, NotUsed> source =
            MongoSource.create(collection.find().sort(ascending("issi")))
                    .map(MongoDbUtils::fixId);

    try {
        source
                .map(x -> {
                    log.debug("Found source");
                    return x;
                })
                .take(1)
                .map(x -> {
                    log.debug("take 1");
                    return x;
                })
                .map(Document::keySet)
                .map(x -> {
                    log.debug("keySet");
                    return x;
                })
                .via(CsvFormatting.format(
                        ',',
                        '"',
                        '\\',
                        "\r\n",
                        CsvQuotingStyle.REQUIRED,
                        StandardCharsets.UTF_8,
                        Optional.of(ByteOrderMark.UTF_8)))
                .runWith(Sink.head(), materializer)
                .whenComplete((result, e) -> {
                    if (e == null) {
                        log.debug("Success - exporting keys {}", result);
                        exporter.export(result.utf8String());
                        log.debug("Success - exported keys");
                        source
                                .map(x -> {
                                    log.debug("Found collection");
                                    return x;
                                })
                                .map(Document::values)
                                .map(x -> {
                                    log.debug("Mapped to values");
                                    return x;
                                })
                                .map(values -> values.stream().map(Object::toString).collect(Collectors.toList()))
                                .map(x -> {
                                    log.debug("Mapped to String List");
                                    return x;
                                })
                                .via(CsvFormatting.format())
                                .map(s -> {
                                    log.debug("Formatted to CSV");
                                    return s;
                                })
                                .runWith(Sink.head(), materializer)
                                .whenComplete((values, error) -> {
                                    if (error == null) {
                                        log.debug("Success - exporting values {}", values);
                                        exporter.export(values.utf8String());
                                        exporter.endData();
                                        log.debug("Success - ended data");
                                    } else {
                                        log.error("Failed to export data", error);
                                    }
                                });
                    } else {
                        log.error("Failed to export data", e);
                    }
                });
    } catch (final Throwable t) {
        log.error("Export error", t);
    }
ennru commented 3 years ago

Did you identify what causes this error?

MattiL commented 3 years ago

Did you identify what causes this error?

No, I did not. Apache CSVPrinter produced the same crash, so it might not be specific to Alpakka.

ennru commented 3 years ago

Ok, I suggest this has nothing to do with the CSV. It looks like a problem in JSON handling/Mongo:

c.p.r.c.JSONExceptionHandler [JSONExceptionHandler.java:43]
Unhandled exception
java.lang.InterruptedException: null
MattiL commented 3 years ago

Also lineScanner crashed. openjdk version "1.8.0_292" OpenJDK Runtime Environment (build 1.8.0_292-b10) OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)

ennru commented 3 years ago

If you can provide a test case without involving anything but Alpakka CSV there might be a chance to trace it down.