kestra-io / plugin-serdes

https://kestra.io/plugins/plugin-serdes/
Apache License 2.0
2 stars 6 forks source link

Improve plugin docs to make clear that JsonToIon requires JSONL format (with new-line) #118

Closed shrutimantri closed 4 months ago

shrutimantri commented 5 months ago

Expected Behavior

JsonToIon task should work seamlessly.

Actual Behaviour

JsonToIon task leads to an error. Here is the error stacktrace:

2024-06-10 09:21:50.020 com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Array (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
2024-06-10 09:21:50.020 Unexpected end-of-input: expected close marker for Array (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
2024-06-10 09:21:50.020 reactor.core.Exceptions$ReactiveException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Array (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
    at reactor.core.Exceptions.propagate(Exceptions.java:410)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:102)
    at reactor.core.publisher.Mono.block(Mono.java:1779)
    at io.kestra.plugin.serdes.json.JsonToIon.run(JsonToIon.java:85)
    at io.kestra.plugin.serdes.json.JsonToIon.run(JsonToIon.java:33)
    at io.kestra.core.runners.WorkerTaskThread.doRun(WorkerTaskThread.java:77)
    at io.kestra.core.runners.AbstractWorkerThread.run(AbstractWorkerThread.java:56)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
        ... 5 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Array (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 1])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:699)
    at com.fasterxml.jackson.core.base.ParserBase._handleEOF(ParserBase.java:514)
    at com.fasterxml.jackson.core.base.ParserBase._eofAsNextChar(ParserBase.java:531)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2524)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:705)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializerNR._deserializeNR(UntypedObjectDeserializerNR.java:281)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializerNR.deserialize(UntypedObjectDeserializerNR.java:79)
    at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4899)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3846)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3814)
    at io.kestra.plugin.serdes.json.JsonToIon.lambda$nextRow$1(JsonToIon.java:118)
    at io.kestra.core.utils.Rethrow.lambda$throwConsumer$0(Rethrow.java:50)
    at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568)
    at reactor.core.publisher.Mono.block(Mono.java:1778)
    ... 4 more

Findings: We are able to convert such JSON files to ION seamlessly:

{"product_id":"1","product_name":"streamline turn-key systems","product_category":"Electronics","brand":"gomez"}
{"product_id":"2","product_name":"morph viral applications","product_category":"Household","brand":"wolfe"}
{"product_id":"3","product_name":"expedite front-end schemas","product_category":"Household","brand":"davis-martinez"}

We are NOT able to convert a file containing list of records as JSON:

[
    {"product_id":"1","product_name":"streamline turn-key systems","product_category":"Electronics","brand":"gomez"},
    {"product_id":"2","product_name":"morph viral applications","product_category":"Household","brand":"wolfe"},
    {"product_id":"3","product_name":"expedite front-end schemas","product_category":"Household","brand":"davis-martinez"}
]

Steps To Reproduce

  1. Run the flow given the example flow.

Environment Information

Example flow

id: json_to_ion
namespace: dev

tasks:
  - id: http_download
    type: io.kestra.plugin.core.http.Download
    uri: https://huggingface.co/datasets/kestra/datasets/raw/main/json/app_events.json

  - id: to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.http_download.uri }}"
loicmathieu commented 5 months ago

@shrutimantri the format must be JSONL which means one JSON by line. We didn't support an array of JSON objects.