getindata / flink-http-connector

Http Connector for Apache Flink. Provides sources and sinks for Datastream , Table and SQL APIs.
Apache License 2.0
136 stars 39 forks source link

setElementConverter for HttpSinkRequestEntry is deprecated ,what is alternate method or code we need to use #61

Closed someshwar1 closed 12 months ago

someshwar1 commented 1 year ago

@Deprecated @PublicEvolving public HttpSinkBuilder setElementConverter(ElementConverter<InputT, HttpSinkRequestEntry> elementConverter) { this.elementConverter = elementConverter; return this; }

code : HttpSink.builder() .setEndpointUrl("http://example.com/myendpoint") .setElementConverter( (s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) .setProperty("gid.connector.http.sink.header.X-Content-Type-Options", "nosniff") .build();

kristoffSC commented 1 year ago

Hi, thanks for asking. Please use setElementConverter(SchemaLifecycleAwareElementConverter) instead.

SchemaLifecycleAwareElementConverter extends ElementConverter interface by adding open method to the API.

The info you are looking for is included in java dock of the deprecated method image

I see that README and the examples were not updated... sorry :)

I've created an issue for this: https://github.com/getindata/flink-http-connector/issues/62

someshwar1 commented 12 months ago

image after updating i am getting compilation issue , added dependencies org.apache.flink.flink-java org.apache.flink.flink-clients org.apache.flink.flink-connector-base

can you please suggest

kristoffSC commented 12 months ago

The SchemeLifecycleElementConverter is not a functional interface like ElementConverter was, hence you cannot use lambda expression to represent it.

What you can do is implement the interface and pass its instance or use anonymous class like so:

private final SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>
        ELEMENT_CONVERTER_V2 = new SchemaLifecycleAwareElementConverter<>() {
            @Override
            public void open(InitContext context) {
                //noOp
            }

            @Override
            public HttpSinkRequestEntry apply(String element, Context context) {
                return new HttpSinkRequestEntry("POST", element.getBytes(StandardCharsets.UTF_8));
            }
        };

   HttpSink.<String>builder().setElementConverter(ELEMENT_CONVERTER_V2)...

The open method is called when SinkWriter is created.

someshwar1 commented 12 months ago

Thank @kristoffSC for suggestion, i have tried it out getting serialization issue

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: Error in serialization. at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:326) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:160) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1024) at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:105) at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2197) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)

kristoffSC commented 12 months ago

Hi @someshwar1 sorry but I'm sure if this is caused by the http connector nor API changes we did.

Please post a job example that reproduce this issue so we could narrow it down.

someshwar1 commented 12 months ago

functional interf

found issue instead implement SchemaLifecycleAwareElementConverter in same class created new class and call as shown below fixed the issue

public class ElementConverterCustom implements SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>{
    @Override
    public void open(InitContext context) {
      //noOp
    }

    @Override
    public HttpSinkRequestEntry apply(String element, Context context) {
      return new HttpSinkRequestEntry("GET", element.getBytes(StandardCharsets.UTF_8));
    }
  }
var httpSink = HttpSink.<String>builder()
    .setEndpointUrl("("http://example.com/myendpoint")
    .setElementConverter(
       new  ElementConverterCustom())
    .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
    .setProperty(
        HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type",
        contentTypeHeader)
           .build();
kristoffSC commented 12 months ago

Glad that worked, seems like straightforward Java <-> Flink case :)

instead implement SchemaLifecycleAwareElementConverter in same class

This will work but your "new inner class" must be declared static. If its not static then it holds an reference to the "outer" class which is not serializable.

Also if you use a reference to static Anonymous class this will also work if defined in the same class.

Anyways the options are:

  1. use static class or anonymous class if you wish to define SchemaLifecycleAwareElementConverter as inner class
  2. implement SchemaLifecycleAwareElementConverter on a brand new class.

Ad 1: This will work:

public class MyClass {
 private static class EC implements SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry> {
  ....
 }
}

This will also work:

public class MyClass {

 private static final SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>
        EC = new SchemaLifecycleAwareElementConverter<>() {
         ....
        }
}

and those will not work, causing serialization exception:

public class MyClass {
  private class EC implements SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry> {
  ....
 }
}
public class MyClass {

 private final SchemaLifecycleAwareElementConverter<String, HttpSinkRequestEntry>
        EC = new SchemaLifecycleAwareElementConverter<>() {
         ....
        }
}