ok2c / httpcomponents-jackson

JSON message asynchronous producers and consumers for Apache HttpComponents 5.0 based on Jackson JSON library
https://ok2c.github.io/httpcomponents-jackson
Apache License 2.0
7 stars 6 forks source link

When parsing tokens, JsonTokenConsumer would be easier to use than JsonTokenEventHandler #7

Closed scr-oath closed 2 years ago

scr-oath commented 2 years ago

I'm examining using an http response that is can be quite large and wish to process it asynchronously.

It seems like the JsonPointer might be useful but even that is a bit clunky - something more like JSONPath would be better still if there were some facility to pass that…

Ultimately, to become unstuck, it seems like the JsonResponseStreamConsumer and JsonTokenEntityConsumer classes are package-private and/or final, so can't be used or extended. If a variant of JsonResponseConsumers.create were provided that took a JsonTokenConsumer instead of the JsonTokenEventHandler then one might have access to the JsonParser at least, as well as the current token and be able to whip up something like:

    val pattern = """/documents/\d+/id""".r

    // ...

    (tokenId, jsonParser) -> {
      if (tokenId == JsonTokenId.ID_STRING && pattern.matches(parser.getParsingContext.pathAsPointer().toString) {
         // Do something
      }
    }

For json like the following where the entire response is streamed and large, and the documents is the largest and most important piece to pull out the ids from a "visit" or query (to Vespa FWIW) where stream=true is specified.

{
  ...
  "documents": [
    {"id": "someid", ...},
    ...
  ]
}

Ultimately, I plan on using this from within a Spark program and why that may be interesting is that I ultimately want to convert from an iterator to an iterator via the streaming response.

I was thinking of using RxJava Single.create to make an "emitter" object that could turn this inside out again and transform async callbacks to an iterator. While the details of that may seem to be sharing TMI, I was thinking about how ultimately, I'd like to be able to check the response first (because errors may not be json) and only on 200 response continue with the entity consumption. Therefore, in terms of the objects in play, the JsonConsumer<HttpResponse> may need to influence the creation of the JsonTokenConsumer (or indeed be the same object that implements both interfaces).

Just imagining the three points where hooks/decisions may need to be made, I think there are three conceptual places:

  1. The HttpResponse is delivered with statusCode and ContentType to determine whether to continue or bail and what processor is needed (whether Json or http/xml/text whatever).
  2. Each token is delivered - again, would really like to have access to at least the token or tokenId and the JsonParser or JsonPointer to know where I am in the tree - being able to filter with TokenFilter or JSONPath would be helpful here but not blocked if JsonTokenConsumer could be used in the JsonResponseStreamConsumer
  3. The entity completes (and the FutureCallback is notified)

This is my first attempt to put the things I encountered just today into words, and i'd be happy to either discuss or revise as needed to make a good recommendation for a fix.

scr-oath commented 2 years ago

I think… just making JsonResponseStreamConsumer and AbstractJsonEntityConsumer public might unstick everything - with that class public, then one could replace

    public static AsyncRequestConsumer<Void> create(JsonFactory jsonFactory,
                                                    JsonConsumer<HttpRequest> messageConsumer,
                                                    JsonTokenEventHandler eventHandler) {
        return new JsonRequestStreamConsumer<>(
                () -> new JsonTokenEntityConsumer(jsonFactory, eventHandler),
                messageConsumer);
    }

with

new JsonRequestStreamConsumer<>(
    () -> new AbstractJsonEntityConsumer<>() {
      override JsonTokenConsumer createJsonTokenConsumer(Consumer<T> resultConsumer) {
         return (tokenId, jsonParser) -> ...
      }
    },
    messageConsumer);

I see that JsonStreamConsumer has some logic to switch out a NoopConsumer when not json, and that that's called from final class JsonRequestStreamConsumer 's override of HC5's AsyncRequestConsumer consumeRequest method… but… if JsonRequestStreamConsumer could be made non-final, then it too could be overridden to call something else (or consumeMessage on a delegate) if non-200…

ok2c commented 2 years ago

@scr-oath Would it also be possible to add more methods to JsonResponseConsumers instead of opening up JsonResponseStreamConsumer?

scr-oath commented 2 years ago

Yeah, I think I figured it out - please check out #8