getindata / flink-http-connector

Http Connector for Apache Flink. Provides sources and sinks for Datastream , Table and SQL APIs.
Apache License 2.0
136 stars 39 forks source link

Implement Cache in Lookup Http Source #63

Open kristoffSC opened 11 months ago

kristoffSC commented 11 months ago

Currently http lookup source will forward every lookup query to external system. This can be easily a bottle neck for processing pipeline.

A remedy for this would be to implement cache similar how it is done in jdbc-connector's lookup source

This jbdc'c connector implementation uses Flink's 1.16 enhance Lookup Source interfaces that provide cache abstraction [1] and [2]

This ticket is about adding same enhancement based on [1] and [2] to http lookup source. Verify if this is possible to still run this connector on Flink 1.15 without cache, if not drop 1.15 support.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric [2] Support Partial and Full Caching in Lookup Table Source

davidradl commented 5 months ago

@kristoffSC we are interested in having caching. I was thinking the change would be quite minimal to the http client code base. I was thinking of a very similar approach to the JDBC connector 1) pass in the cache on the HttpLookupTableSource constructor 2) then in the getLookupRuntimeProvider replace

  if (lookupConfig.isUseAsync()) {
            log.info("Using Async version of HttpLookupTable.");
            return AsyncTableFunctionProvider.of(
                new AsyncHttpTableLookupFunction(dataLookupFunction));
        } else {
            log.info("Using blocking version of HttpLookupTable.");
            return TableFunctionProvider.of(dataLookupFunction);
        }

with something like:

    if (lookupConfig.isUseAsync()) {
            log.info("Using Async version of HttpLookupTable.");
            if (cache != null) {
                new AsyncHttpTableLookupFunction(dataLookupFunction, cache));
           } else {
            return AsyncTableFunctionProvider.of(
                new AsyncHttpTableLookupFunction(dataLookupFunction));
            }
        } else {
            log.info("Using blocking version of HttpLookupTable.");
            if (cache != null) {
               return PartialCachingLookupProvider.of(dataLookupFunction, cache);
            } else {  
               return TableFunctionProvider.of(dataLookupFunction);
            }
       }
        }

And change AsyncHttpTableLookupFunction to accept a cache.

What do you think ? I am happy to code this up with some tests, but wanted to check that this sounded reasonable. I am not sure we would need Full caching.

kristoffSC commented 5 months ago

Hi @davidradl I'm very glad to hear from you :)

This is a super news, go ahead with the impl. The High level view you presented sounds good. I wonder whether we should make cache as default or not, maybe not since maybe (I hope) there are already some systems that uses it and they may not expect to have cache there ;)

Also I was thinking that this is a time to drop 1.15 support. In your PR you can remove 1.15 build and move to 1.16, 1.17 and 1.18.

Actually There is a problem with some tests on 1.17 -> https://github.com/getindata/flink-http-connector/issues/64 I think we should tackle this first. This seems like some dependency issue rather then real code/impl issue but maybe I'm wrong.

I may have some time to try work on issue 64 (I'm not promising anything) but if you want you can start that also.

Cheers.

davidradl commented 5 months ago

@kristoffSC sounds good. I am glad to be able to do more with this connector. I had used the connector with Flink 1.18 for our use cases in the past.

It is likely going to be a couple of weeks or so before I an work on this - I need to clear other commitments first.

davidradl commented 4 months ago

@kristoffSC I have not yet got to it. But still intend to!

davidradl commented 4 months ago

@kristoffSC I have had a look at this - it is not as simple as I had thought. It seems that the HttpClientLookupFunction does not extend LookupFunction. LookupFunction has its own evaland HttpClientLookup has its own eval. The PartialCachingLookupProvider is working onLookupFunctions so cannot be used as is.

It seems to me ideally we want to be using the Flink providedCachingLookupFunction and construct this with a delegate that overrides the lookup function https://github.com/apache/flink/blob/46cbf22147d783fb68f77fad95161dc5ef036c96/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L46 In this way we pick up the caching from the framework.

Is there a consideration around the Http client design that would make this difficult ? I am checking with you before I dive in !

WDYT

davidradl commented 3 months ago

@kristoffSC any thoughts on this one. If not I will attempt a refactor to use the Flink caching framework.