Closed kristoffSC closed 8 months ago
@kristoffSC we are interested in getting this issue resolved - please could you assign it to me and I will investigate
@davidradl I'm very happy to see someone who would like to fix this issue and help with the connector :)
Please let me add some more info and context here.
The first issue can be fixed very easily. Add Class-loader argument to HttpLookupTableSource
constructor.
Use this class-loader in line 65 instead Thread.currentThread().getContextClassLoader()
.
Then in HttpLookupTableSourceFactory#createDynamicTableSource ~line 74, pass context.getClassLoader()
to HttpLookupTableSource constructor.
The second issue might be more complicated and may require bigger changes - I'm open for every constructive suggestion though. We use our custom QueryFormatAwareConfiguration and PrefixedConfigOption because one of out design decisions was that user will be able to add its own custom formats, query creators and handlers and use them both in streaming and table API. We use Flink's factory and factory discovery mechanism that is based on Java SPI a lot.
So the common pattern in Flink connectors was that options for format are prefixed with format name. We leverage this and apply to all custom components (formats, handlers, query creators). User can create its own component with its own properties and use its properties to table DDL definition and or Streaming API definition. As long as property will be prefixed with custom component identifier, our implementation will fish them out and apply to component factory.
However using custom factories + custom classes with Flink factory discovery mechanism now requiters to use proper class loader. The 2nd problem might be cased by fact that GenericJsonQueryCreatorFactory
still uses Thread.currentThread().getContextClassLoader()
. Using proper class-loader like in the 1st issue might not be that easy.
@kristoffSC thanks for all the detail - I will look into this.
I am not sure how much if any of this sort of customisation we will need. I wonder what you think of this running in a mode without custom factories & classes; which would mean we would not need the class loaders and could hard code the factory we need either as a explicit constructor or loading using reflection; we could use one of the factories your connector already supplies.
Checking my assumptions:
@kristoffSC I have created some code https://github.com/getindata/flink-http-connector/pull/68 as a draft so you can see it.
I attempted to test this using the sql-client:
CREATE TABLE Customers (customerId STRING, customerName STRING, balance DOUBLE) WITH ( 'connector' = 'rest-lookup', 'url' = 'https://localhost:8091/api', 'gid.connector.http.source.lookup.query-creator' = 'generic-json-query’, 'asyncPolling' = 'true', 'lookup-method' = 'POST', 'format' = 'json' , 'gid.connector.http.security.cert.server' = 'certs/server.cert');
CREATE TABLE Orders (id STRING, customerId STRING, order_time TIMESTAMP(3), proc_time AS PROCTIME(), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND) WITH ( 'connector' = 'datagen', 'number-of-rows' = '2', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '2', 'fields.customerId.kind' = 'sequence', 'fields.customerId.start' = '1', 'fields.customerId.end' = '2', 'fields.order_time.kind' = 'random', 'fields.order_time.max-past' = '1 DAYS' );
SELECT o.id, o.order_time, o.customerId, c.customerName, c.balance FROM Orders AS o JOIN Customers2 FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customerId = c.customerId; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in the classpath.
I was hoping that by specifying the 'gid.connector.http.source.lookup.query-creator' = 'generic-json-query’, this would tell the code which factory to use. I thinkI need to do more than "Then in HttpLookupTableSourceFactory#createDynamicTableSource ~line 74, pass context.getClassLoader() to HttpLookupTableSource constructor." I also tried this with the generic-get-query and it failed with the same error.
Any thoughts? My Flink is the latest Flink.
Note I can run a DataStream java app successfully using the same create tables and lookup join (without the need to specify a gid.connector.http.source.lookup.query-creator).
I have source debugged the error and it is
Method threw 'org.apache.flink.table.api.ValidationException' exception. Cannot evaluate org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin.toString()
Here is the stack trace:
Hi @davidradl big thanks for taking look at this.
Your assumptions about why we used class loaders are not quite correct.
The mechanism we used is actually very common in Flink ecosystem. If you dig into Flink code you will see that SQL Dialects, Catalogs, DynamicTables and formats (for example Json format for Kafka or SQL) are using the same Factory mechanism.
This is described [here](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources)
and here. In the roots this mechanism uses reflection. You can look at it as a type of dependency/implementation injection.
What we did is just use this mechanism in one extra place, which is query creators for SQL Lookup. We would like to keep it open for customization. At the end, the users tend to know what exact format/query they need in their system - it all depends on the HTTP endpoint and this is totally client dependent with some exceptions like Elastic Search which we do support. Although I now see its not documented...
I was hoping that by specifying the 'gid.connector.http.source.lookup.query-creator' = 'generic-json-query’, this would tell the code which factory to use.
You are correct. The creators you have mentioned are default for GET and POST/PUT requests. If not specified by hand, those will be loaded using the very same Factory mechanism - check out here.. You can put BP at this line and run one of out tests, for example this one.
The problem is with SQL client though... TBH I still consider this as a Sandbox tool, maybe I'm wrong... but I always had problems with SQL client. Not only with this connector.
Anyways, regarding the error you have. The solution I proposed will fix the first problem as described in ticket.
The reason you still don't see that class on the classpath is because most likely you need to run SQL client with -j
options and specify path to connector jar or with -l
and specify path to folder that has connector jar. I remember adding at some point connector jar also to Flink's lib folder but I'm not 100% sure about that. I know I really struggled with SQL Client...
@kristoffSC Thanks - I have got past the class loader issue. Now I see the prefix issue. Looking into that.
@kristoffSC
A summary on where I am:
Ways forward:
I have had a play with reflection, I can get it make getClazz and isList accessible but am not sure how to make a parent constructor not protected in the child constructor that needs to issue this or super first.
Next action I am going to look into the addAll as it might mean we could do the prefixing in QueryFormatAwareConfiguration without the need for the PrefixedConfigOption.
WDYT?
I will replay to your post above a little bit later but just a quick comment on yours:
I assume you have another scenario that is not the sql-client that you get this issue with.
Actually no :) As far as we know, this issue happens only for SQL client. TableAPI and Streaming API works fine.
We have a production system that uses this connector with custom formats and query creators and uses SQL (not SQL client).
@kristoffSC I think I have fixed it https://github.com/getindata/flink-http-connector/pull/68. Are there any side effects to this approach you can see?
I have changed the PrefixedConfigOption to not subclass ConfigOption. It now holds the original ConfigOption as a member variable exposed with a getter. I moved the class into the the standard package as it no longer needed to be in the flink package.
@davidradl the PR looks really good but unfortunately it will not work :(
I added a test to main branch GenericJsonQueryCreatorFactoryTest::shouldPassPropertiesToQueryCreatorFormat
that verifies that properties are passed to format used by query creator. It will fail on your branch.
Here is why.
When we were extending original Flink's ConfigOption we override key
method so it will return original key + prefix. In your implementation this is not happening. The original key
method is called and it returns the property name as defined in DDL/properties.
Now why this is bad thing seems ok right?
Well Formats (for example Flink Json format) have their own proprieties, for example fail-on-missing-field
. They are Table/connector agnostic. When you use them in table DDL however you use something like this: json.fail-on-missing-field
The pattern here is format-name.property-name
. When DDL is parsed and validated by table factory, it knows which properties are for table and which one are for format - based on this prefix.
We use the same mechanism here for query creators. So they can use already existing formats like "json" that have their own properties. So in DDL (btw one DDL can represent both Sink and Lookup table) we need to have a way to detect which properties are for query creator format and which one are DDL. For this we use this prefix "lookup-request.format.format.
P.S.
When you pull master to your branch, you will see that new tests is not compiling on your branch. This is because you added DynamicTableContext
to the signature of createLookupQueryCreator
. To fix this just add
Class field:
private DefaultDynamicTableContext tableContext;
In setUp()
method, at the end add:
ResolvedSchema resolvedSchema =
ResolvedSchema.of(Column.physical("key1", DataTypes.STRING()));
this.tableContext = new DefaultDynamicTableContext(
ObjectIdentifier.of("default", "default", "test"),
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
null,
Collections.emptyList(),
Collections.emptyMap()),
resolvedSchema),
Collections.emptyMap(),
config,
Thread.currentThread().getContextClassLoader(),
false
);
and later use this.tableContext
in createLookupQueryCreator
call
@kristoffSC I am not sure if you approve , but we can use reflection to update the private key
variable of ConfigOption. I have put in a fix that does this and your new test passes. I suggest this is good enough for now unless we can change Flink to make this easier. I guess this could fail if Java security is being used. Also do you want anything more sophisticated for the Exception?
Left comments on PR. I'm ok with the reflection.
BTW regarding:
Engage with the Flink community to see this prefix config approach can be used with Flink Change Flink’s ConfigOption to make the constructor, isList and getClazz methods public - like the other methods in ConfigOption. Or maybe Flink change to be aware of a prefix that we could use in this connector. I assume if both the configOption and the PrefixConfigOption were loaded with the same class loader, then the protected methods would be accessible as the they are in the same package.
You can think it is possible? :) I don't think we have a strong case here. Seems no one needed yet :)
Also I have explored your other proposition about config.addAll(other, prefix)
This will not work. The properties with prefix are already in the Configuration. What we need is to add the prefix, in flight to format required/optional properties defined in SerializationFormatFactory
. To those that are returned by Factory::requiredOptions
and Factory::optionalOptions
methods.
Hi @kristoffSC , I have made changes along the spirit of your feedback, I have kept the code in its own class, as I could not get the generics working as a method in the parent class. I am not sure if there is more detail you need in the comments or docs. If you have an example you want to add, I can put that in.
@davidradl do you need a new connector release with this fix?
@kristoffSC One thing I reverted the change mentioning Flink 1.16 to 1.15 . I am not sure if the current code will work at 1.15, as the changes we made are for 1.16 . 1.15 is not downloadable - so I assume it is not supported anymore by Flink. I have raised issue issue 69 so this can be assessed separately
@davidradl do you need a new connector release with this fix?
Yes please.
@kristoffSC One thing I reverted the change mentioning Flink 1.16 to 1.15 . I am not sure if the current code will work at 1.15, as the changes we made are for 1.16 . 1.15 is not downloadable - so I assume it is not supported anymore by Flink. I have raised issue issue 69 so this can be assessed separately
Hi, I think it may work. The changes we did are "transparent" for Flink 1.15. The reason why current code was not working on 1.16 when deploying via SQL client is due to internal Flink changes.
We can start looking at https://github.com/getindata/flink-http-connector/issues/69 next week - I'm off till the end of this week. Somewhere around next week/this weekend I will release new version for the connector.
@kristoffSC I see the pr is failing to build inthe "Add coverage" section . It says : Error: HttpError: Resource not accessible by integration. Is this related to the code I added?
There are several issues when running Http Connector from Flink 1.16 SQL Client an exception is thrown:
which is caused by using
Thread.currentThread().getContextClassLoader(),
in HttpLookupTableSource::getLookupRuntimeProviderThis is caused by change made in https://issues.apache.org/jira/browse/FLINK-15635 the solution is to use
DynamicTableFactory.Context#getClassLoader
instead.When first issue is fixed, the next one occurs:
This one is caused by fact that we extends Flink's
ConfigOption
withPrefixedConfigOption
and callConfigOption
package protected method "getClazz()" in constructor. This starts to causing issues on Flink 1.16 when submitting job from SQL Client due to class loader changes.