gAmUssA / reactor-ksqldb

A wrapper around ksqlDB Java Client using Project Reactor https://projectreactor.io/
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/
MIT License
0 stars 3 forks source link

enhancement : create stream from Pojo #7

Open survivant opened 3 years ago

survivant commented 3 years ago

@gAmUssA

I have java model objects (pojo) that I will send/receive in Kafka. We could create the steams form the pojo structure, it's dynamic...

ex :

@Data
@Builder
public class Orders {
    // KEY : UUID.toString()
    String orderId;
    String product;
    String orderTimestamp;

    // CREATED, SENT, RECEIVED, RETURNED 
    String status;
}

@Data
@Builder
public class Returns {
    // KEY : UUID.toString()
    String returnId;
    String orderId;
    String returnTimestamp;
}

instead of

    String ORDER_TOPIC = "orders";
    String RETURN_TOPIC = "returns";

    String CREATE_STREAM_ORDERS = String.format("CREATE STREAM %s (orderId VARCHAR KEY, product VARCHAR, orderTimestamp VARCHAR, status VARCHAR) "
                                  + " WITH (kafka_topic='%s', partitions=1, value_format='json');", ORDER_TOPIC, ORDER_TOPIC);

    String CREATE_STREAM_RETURNS = String.format("CREATE STREAM %s (returnId VARCHAR KEY, orderId VARCHAR, returnTimestamp VARCHAR) "
                                   + " WITH (kafka_topic='%s', partitions=1, value_format='json');", RETURN_TOPIC, RETURN_TOPIC);

    reactorClient
              .executeStatement(CREATE_STREAM_ORDERS)
              .then(reactorClient.listStreams())
              .block();

    reactorClient
              .executeStatement(CREATE_STREAM_RETURNS)
              .then(reactorClient.listStreams())
              .block();

I could have something like this


.createStream(stream name, pojo class, key to used in the pojo, properties... defaults could be : stream name, 1, json )

reactorClient
            .createStream(ORDER_TOPIC, Orders.class, "orderId", of("kafka_topic", ORDER_TOPIC,
                                                            "partitions", 1,
                                                            "value_format", "json"))
            .then(reactorClient.listStreams())
            .block();

    reactorClient
            .createStream(RETURN_TOPIC, Returns.class, "orderId", of("kafka_topic", RETURN_TOPIC,
                                                                    "partitions", 1,
                                                                    "value_format", "json"))
            .then(reactorClient.listStreams())
            .block();
survivant commented 3 years ago

// POJO should be also supported in ksqldb-client-api directly.

survivant commented 3 years ago
reactorClient
            .createStream(ORDER_TOPIC, Orders.class, "orderId", of("kafka_topic", ORDER_TOPIC,
                                                            "partitions", "1",
                                                            "value_format", "json"))
            .then(reactorClient.listStreams())
            .block();

    reactorClient
            .createStream(RETURN_TOPIC, Returns.class, "orderId", of("kafka_topic", RETURN_TOPIC,
                                                                    "partitions", "1",
                                                                    "value_format", "json"))
            .then(reactorClient.listStreams())
            .block();

    reactorClient.listTopics().block().stream().forEach(topicInfo -> System.out.println("topicInfo="+topicInfo));
    reactorClient.listStreams().block().stream().forEach(streamInfo -> System.out.println("streamInfo="+streamInfo));
    reactorClient.listTables().block().stream().forEach(tableInfo -> System.out.println("tableInfo="+tableInfo));
    reactorClient.listQueries().block().stream().forEach(queryInfo -> System.out.println("queryInfo="+queryInfo));

example of implementation (HACK)

public Mono<ExecuteStatementResult> createStream(String streamName, Class clazz, String key, Map<String,String> streamProperties) {
    // POJO should be also supported in ksqldb-client-api
    // HARDCODE the type to VARCHAR, I don't know how create a KStream .. PS.  need to think about deserialisation also.. for later

    String sql = generateStreamSQL(streamName, clazz, key, streamProperties);

    log.info("generateStreamSQL=" + sql);

    return fromFuture(() -> ksqlDbClient.executeStatement(sql, Collections.emptyMap()));
  }

  private String generateStreamSQL(String streamName, Class clazz, String key, Map<String,String> streamProperties){
    List<Field> allFields = getAllFields(clazz);

    String sqlParams = allFields.stream()
            .map(field -> {
                  String param = field.getName();
                  param+=" VARCHAR";
                  if(Objects.equals(field.getName(), key)){
                    param+=" KEY";
                  }
                  return param;
                  })
            .collect(Collectors.joining(", "));

    String sqlProperties = streamProperties.entrySet().stream()
            .map(entry -> entry.getKey() + "='" + entry.getValue() + "'")
            .collect(Collectors.joining(", "));

    return String.format("CREATE STREAM %s (%s) WITH (%s);", streamName, sqlParams, sqlProperties);
  }

  private List<Field> getAllFields(Class clazz) {
    if (clazz == null) {
      return Collections.emptyList();
    }

    List<Field> result = new ArrayList<>(getAllFields(clazz.getSuperclass()));
    List<Field> filteredFields = Arrays.stream(clazz.getDeclaredFields())
            .filter(f -> !Modifier.isTransient(f.getModifiers())) // we don't want transient fields
            .collect(Collectors.toList());
    result.addAll(filteredFields);
    return result;
  }

example of sql generated

2021-04-14 13:16:55 [INFO] (ReactorClient.java:153) - generateStreamSQL=CREATE STREAM orders (orderId VARCHAR KEY, product VARCHAR, orderTimestamp VARCHAR, status VARCHAR) WITH (kafka_topic='orders', partitions='1', value_format='json');
2021-04-14 13:17:00 [INFO] (ReactorClient.java:153) - generateStreamSQL=CREATE STREAM returns (returnId VARCHAR, orderId VARCHAR KEY, returnTimestamp VARCHAR) WITH (kafka_topic='returns', partitions='1', value_format='json');
topicInfo=TopicInfo{name='orders', partitions=1, replicasPerPartition=[1]}
topicInfo=TopicInfo{name='returns', partitions=1, replicasPerPartition=[1]}
streamInfo=StreamInfo{name='ORDERS', topicName='orders', keyFormat='KAFKA', valueFormat='JSON', windowed='false'}
streamInfo=StreamInfo{name='RETURNS', topicName='returns', keyFormat='KAFKA', valueFormat='JSON', windowed='false'}