ok2c / httpcomponents-jackson

JSON message asynchronous producers and consumers for Apache HttpComponents 5.0 based on Jackson JSON library
https://ok2c.github.io/httpcomponents-jackson
Apache License 2.0
7 stars 6 forks source link

[Fixes #7] Add JsonTokenConsumer creation for consuming tokenId + parser #8

Closed scr-oath closed 2 years ago

scr-oath commented 2 years ago

[Fixes #7]

Rather than opening up permissions to classes as initially thought in #7, add another create method, and a second constructor to JsonTokenEntityConsumer that takes JsonTokenConsumer. Whether wrapping/adapting a JsonTokenEventHandler or using the JsonTokenConsumer directly, invoke the resultConsumer when there are no more tokens.

scr-oath commented 2 years ago

FWIW, I also did this against the mavenLocal snapshot:

      val pointerPattern = """/documents/\d+/id""".r
      val respConsumer = JsonResponseConsumers.create(jsonFactory, messageConsumer, (i: Int, jsonParser: JsonParser) => {
        if (i == JsonTokenId.ID_STRING
          && pointerPattern.matches(jsonParser.getParsingContext.pathAsPointer().toString)) {
          AsyncJsonResponsePtr2.logger.info("Got id {}", jsonParser.getValueAsString())
        }
      })

And it printed only the id values YAY!

scr-oath commented 2 years ago

Hmm… it really does seem useful to be able to supply the consumer depending on the HttpResponse and EntityDetails - those HC5 callbacks are abstracted away by the JsonConsumer - perhaps… if "opening up" the class permissions is undesirable… maybe something like this could be helpful… I'll play with it and possibly add that method too

    public static <T> AsyncResponseConsumer<T> create(
            Function<HttpResponse, AsyncEntityConsumer<T>> messageToConsumerSupplier) {
        AtomicReference<AsyncEntityConsumer<T>> consumerRef = new AtomicReference<>();
        JsonConsumer<HttpResponse> messageConsumer = resp -> consumerRef.set(messageToConsumerSupplier.apply(resp));
        Supplier<AsyncEntityConsumer<T>> consumerSupplier = consumerRef::get;
        return new JsonResponseStreamConsumer<>(consumerSupplier, messageConsumer);
scr-oath commented 2 years ago

Ok - I don't think I need that per se…

While pretty verbose, as a proof of concept, this creates a flowable that honors the "touchpoints" as I called them of the response, json token events that match the pointer I care about, as well as the done by using RxJava's Flowable.mergeWith - if any of the things fails, they all fail.

      val pointerPattern = """/documents/\d+/id""".r
      val idSubject = PublishSubject.create[String]().toSerialized
      val respSubject = AsyncSubject.create[HttpResponse]().toSerialized
      val respCB: JsonConsumer[HttpResponse] = { resp =>
        respSubject.onNext(resp)
        respSubject.onComplete()
      }
      val doneCB = new SubjectFutureCallback(AsyncSubject.create[Void]().toSerialized)
      val doneCompletable = doneCB.subject.ignoreElements()
        .doFinally { () =>
          AsyncJsonResponsePtr.logger.info("done")
        }
      val respConsumer = JsonResponseConsumers.create(jsonFactory, respCB, (i: Int, jsonParser: JsonParser) => {
        i match {
          case JsonTokenId.ID_STRING if pointerPattern.matches(jsonParser.getParsingContext.pathAsPointer().toString) =>
            val id = jsonParser.getValueAsString()
            idSubject.onNext(id)
          case JsonTokenId.ID_NO_TOKEN =>
            idSubject.onComplete()
          case _ => ()
        }
      })
      httpClient.execute(SimpleRequestProducer.create(req), respConsumer, doneCB)
      idSubject.toFlowable(BackpressureStrategy.BUFFER)
        .observeOn(Schedulers.io)
        .mergeWith(respSubject.singleElement().map({ resp =>
          if (resp.getCode != HttpStatus.SC_OK) {
            throw new HttpException(s"non-200 response: ${resp.getCode} ${resp.getReasonPhrase}")
          }
        }).ignoreElement())
        .concatWith(doneCompletable)
        .blockingForEach { id =>
          AsyncJsonResponsePtr2.logger.info("Got id {}", id)
        }
    }

and the SubjectFutureCallback

package moah.tinker.cake;

import io.reactivex.rxjava3.subjects.AsyncSubject;
import io.reactivex.rxjava3.subjects.Subject;
import org.apache.hc.core5.concurrent.FutureCallback;

import java.util.concurrent.CancellationException;

public class SubjectFutureCallback<T> implements FutureCallback<T> {
    public final Subject<T> subject;

    public SubjectFutureCallback() {
        this.subject = AsyncSubject.<T>create().toSerialized();
    }

    public SubjectFutureCallback(Subject<T> subject) {
        this.subject = subject;
    }

    @Override
    public void completed(T result) {
        if (result != null) {
            subject.onNext(result);
        }
        subject.onComplete();
    }

    @Override
    public void failed(Exception ex) {
        subject.onError(ex);
    }

    @Override
    public void cancelled() {
        subject.onError(new CancellationException());
    }
}
scr-oath commented 2 years ago

So anyway… it's good as is, modulo the conversation about imports - do let me know if you can accept it as-is or what your convention is for ordering imports; thanks.

ok2c commented 2 years ago

So anyway… it's good as is, modulo the conversation about imports - do let me know if you can accept it as-is or what your convention is for ordering imports; thanks.

@scr-oath The code scheme used by the project is pretty much in line with the one by the Apache HttpComponents.

It would be great if you could adjust your changes to conform to the code scheme. If not, I can merge the PR and tweak your commit afterwards.

<code_scheme name="HC" version="173">
  <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99999" />
  <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99999" />
  <option name="IMPORT_LAYOUT_TABLE">
    <value>
      <package name="" withSubpackages="true" static="true" />
      <emptyLine />
      <package name="java" withSubpackages="true" static="false" />
      <emptyLine />
      <package name="javax" withSubpackages="true" static="false" />
      <emptyLine />
      <package name="org" withSubpackages="true" static="false" />
      <emptyLine />
      <package name="com" withSubpackages="true" static="false" />
      <emptyLine />
      <package name="" withSubpackages="true" static="false" />
    </value>
  </option>
  <GroovyCodeStyleSettings>
    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999" />
    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999" />
  </GroovyCodeStyleSettings>
  <JavaCodeStyleSettings>
    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99999" />
    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99999" />
    <option name="IMPORT_LAYOUT_TABLE">
      <value>
        <package name="" withSubpackages="true" static="true" />
        <emptyLine />
        <package name="java" withSubpackages="true" static="false" />
        <emptyLine />
        <package name="javax" withSubpackages="true" static="false" />
        <emptyLine />
        <package name="org" withSubpackages="true" static="false" />
        <emptyLine />
        <package name="com" withSubpackages="true" static="false" />
        <emptyLine />
        <package name="" withSubpackages="true" static="false" />
      </value>
    </option>
  </JavaCodeStyleSettings>
  <JetCodeStyleSettings>
    <option name="PACKAGES_TO_USE_STAR_IMPORTS">
      <value />
    </option>
    <option name="NAME_COUNT_TO_USE_STAR_IMPORT" value="2147483647" />
    <option name="NAME_COUNT_TO_USE_STAR_IMPORT_FOR_MEMBERS" value="2147483647" />
  </JetCodeStyleSettings>
</code_scheme>
scr-oath commented 2 years ago

Thanks so much for the code_scheme doc - not sure what consumes that, but the info was there to add an HC setting in Intellij's editor style for Java.

I re-optimized imports and some things still remained moved around - such as…

import com.fasterxml.jackson.core.type.TypeReference;

Which was previously a loaner (by itself), and so moved down to the rest of the com.* in its lexicographical order.

ok2c commented 2 years ago

@scr-oath Many thanks for this contribution!