wanglingsong / JsonSurfer

A streaming JsonPath processor in Java
MIT License
294 stars 55 forks source link

Add Support For Knowing Parsing Completed #45

Closed ankurpathak closed 5 years ago

ankurpathak commented 5 years ago

Right now on trying to convert this Api to Reactive Streams we don't have any way to know that parsing completed, hence can con signal onComplete event in Reactive Streams. So please add support for same. If I am missing something then correct me.

wanglingsong commented 5 years ago

The parsing complete when the surfing API exit.

        JsonSurfer surfer = JsonSurferGson.INSTANCE;
        surfer.configBuilder()
                .bind("$.store.book[*]", new JsonPathListener() {
                    @Override
                    public void onValue(Object value, ParsingContext context) {
                        System.out.println(value);
                    }
                })
                .buildAndSurf(sample);
        // Parsing complete here
ankurpathak commented 5 years ago

Thanks, for helping. Can, I use this snippet of Spring Reactor Flux with Spring Reactor WebFlux Controllers, Will it be non blocking?? public static Flux jsonPath(String json, String jsonPath) { JsonSurfer surfer = JsonSurferJackson.INSTANCE; return Flux.create(sink -> { surfer.configBuilder().bind(jsonPath, (value, context) -> { sink.next(value); }).buildAndSurf(json); sink.complete(); }); }

wanglingsong commented 5 years ago

I think so. Just try it and see what happen

wanglingsong commented 5 years ago

Hi, @ankurpathak . Does it work?

ankurpathak commented 5 years ago

Yes Its working with both Mono and Flux, Here is a code I am using.

//Mono

import org.jsfr.json.*;
import org.jsfr.json.provider.JacksonProvider;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.springframework.core.ResolvableType.forClass;

public class MonoUtil {

    private MonoUtil() {
    }

    public static <T> Flux<DataBuffer> toDataBuffer(DataBufferFactory bufferFactory, T t, Class<T> type, Jackson2JsonEncoder encoder) {
        ResolvableType elementType = ResolvableType.forClass(type);
        return encoder.encode(Mono.just(t), bufferFactory, elementType, MediaType.APPLICATION_JSON, emptyMap());
    }

    public static <T> Mono<T> fromDataBuffer(Flux<DataBuffer> body, Class<T> type, Jackson2JsonDecoder decoder) {
        ResolvableType elementType = forClass(type);
        return decoder.decodeToMono(body, elementType, MediaType.APPLICATION_JSON, Collections.emptyMap()).cast(type);
    }

    public static <T> Mono jsonPath(String json, String jsonPath) {
        JsonSurfer surfer = JsonSurferJackson.INSTANCE;
        return Mono.create(sink -> {
            surfer.configBuilder().bind(jsonPath, (value, context) -> sink.success(value)).buildAndSurf(json);
        });
    }

}
//Flux
import org.jsfr.json.JsonSurfer;
import org.jsfr.json.JsonSurferJackson;
import org.jsfr.json.ParsingContext;
import org.jsfr.json.TypedJsonPathListener;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collection;
import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.springframework.core.ResolvableType.forClass;

public class FluxUtil {

    private FluxUtil(){}

    public static <T> Flux<DataBuffer> toDataBuffer(DataBufferFactory bufferFactory, Collection<T> collection, Class<T> type, Jackson2JsonEncoder encoder){
        ResolvableType elementType = ResolvableType.forClass(type);
        return encoder.encode(Flux.fromIterable(collection), bufferFactory, elementType, MediaType.APPLICATION_JSON, emptyMap());
    }

    public static <T> Flux<T> fromDataBuffer(Flux<DataBuffer> body, Class<T> type, Jackson2JsonDecoder decoder){
        ResolvableType elementType = forClass(type);
        return decoder.decode(body, elementType, MediaType.APPLICATION_JSON, Collections.emptyMap()).cast(type);
    }
    public static <T> Flux<T> jsonPath(String json, Class<T> type, String jsonPath) {
        JsonSurfer surfer = JsonSurferJackson.INSTANCE;
        return Flux.create(sink -> {
            surfer.configBuilder().bind(jsonPath, type, (t,context) -> sink.next(t)).buildAndSurf(json);
            sink.complete();
        });
    }
}
ankurpathak commented 5 years ago

Can't we have inbuild support for Reactive Streams(RxJava and Project Reactor)??

wanglingsong commented 5 years ago

I don't think it's good idea to include extra dependency to support it in JsonSurfer, because not everyone uses JsonSurer with Reactive Streams and It looks like not too difficult to create Reactive Streams with JsonSurfer according to your code snippet. However, I can add one more example in ReadMe to show your use case. Anyway, Thanks for your suggestion.

ankurpathak commented 5 years ago

Ya sure, Adding the example is also a good idea.