getindata / flink-http-connector

Http Connector for Apache Flink. Provides sources and sinks for Datastream , Table and SQL APIs.
Apache License 2.0
136 stars 39 forks source link

how to POST array values? #89

Closed p-eye closed 2 months ago

p-eye commented 2 months ago

Hello, I want to send a POST request with data containing a jsonObject Array. Here is an example of the data. The depth of the object is dynamically changeable

{
  "id":"myid1",
  "events":[{"key":1, "values": {"value1":1, "value2": {}}}, {"key": 2, "values": {}}]
}

For this, I mapped the row to the AppLog class and created an httpSink.

public class AppLog {

    public String id;
    public JsonNode events;

    public AppLog(String id, JsonNode events) {
        this.client = client;
        this.events = events;
    }

    public AppLog() {

    }

    public byte[] getBytes() {
        ObjectMapper mapper = new ObjectMapper();
        String jsonString = null;
        try {
            jsonString = this.toString();
            return jsonString.getBytes("UTF-8");
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public String toString() {
        return "{\"client\":\"" + client + "\",\"events\":" + events + "}";
    }
}

...

Properties properties = new Properties();
properties.setProperty("gid.connector.http.sink.request-callback", "slf4j-logger");
properties.setProperty("gid.connector.http.sink.writer.request.mode", "single");

HttpSink httpSink = HttpSink.<AppLog>builder()
        .setEndpointUrl("myURL")
        .setElementConverter(
                (s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes()))
        .setProperties(properties)
        .build();

This is my problem. When I send a POST request above httpSink, the request type maybe not the same as the receiving side. this is an Postman case, the "x-www-form-urlencoded" format sends correct requests, but sending it in "form-data" format causes a mismatch with the receiving side. This "form-data of Postman" problem is the same as the one with the flink-http-connector.

So, I tried some cases in the flink-http-connector to post with application/x-www-form-urlencoded but couldn't solve it.

properties.setProperty("gid.connector.http.source.lookup.header.Content-Type", "application/json");
properties.setProperty("gid.connector.http.source.lookup.header.Content-Type", "application/x-www-form-urlencoded");

Is there are any options to solve?

thank you

p-eye commented 2 months ago

i solved it. To request with "application/x-www-form-urlencoded", map objectNode to String. thank you

        DataStream<String> processedStream = kafkaStream.map(new MapFunction<ObjectNode, String>() {
            @Override
            public String map(ObjectNode objectNode) throws Exception {
                JsonNode valueNode = objectNode.path("value");
                StringBuilder result = new StringBuilder();
                boolean first = true;
                Iterator<Map.Entry<String, JsonNode>> fields = valueNode.fields();
                while (fields.hasNext()) {
                    if (first)
                        first = false;
                    else
                        result.append("&");
                    Map.Entry<String, JsonNode> entry = fields.next();
                    result.append(URLEncoder.encode(entry.getKey(), "UTF-8"));
                    result.append("=");
                    result.append(URLEncoder.encode(entry.getValue().asText(), "UTF-8"));
                }
                return result.toString();
            }
        });