streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
278 stars 120 forks source link

[BUG] DOC-BUG properties.pulsar.reader config format should Use "-" instead of dots or camel case #414

Open Aaronzk opened 3 years ago

Aaronzk commented 3 years ago

Describe the bug when using SQL to read Pulsar following the README.md

  'properties.pulsar.reader.readerName'='Flink-reader',
  'properties.pulsar.reader.subscriptionRolePrefix'='flink-pulsar-',

I met a Exception like this

Caused by: java.lang.RuntimeException: Failed to load config into existing configuration data
    at org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.loadData(ConfigurationDataUtils.java:69)
    at org.apache.pulsar.client.impl.ReaderBuilderImpl.loadConf(ReaderBuilderImpl.java:106)
    at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.createActualReader(ReaderThread.java:131)
    at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:104)
Caused by: org.apache.pulsar.shade.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "readername" (class org.apache.pulsar.client.impl.conf.ReaderConfigurationData), not marked as ignorable (11 known properties: "cryptoKeyReader", "receiverQueueSize", "subscriptionRolePrefix", "readerListener", "subscriptionName", "topicNames", "readCompacted", "readerName", "cryptoFailureAction", "keyHashRanges", "resetIncludeHead"])
 at [Source: (String)"{"cryptoFailureAction":"FAIL","readCompacted":false,"topicNames":["persistent://public/default/mytopic"],"readername":"shy","subscriptionRolePrefix":"bigdata-di-rt-transform-consumer-hsy-prod","resetIncludeHead":false,"receiverQueueSize":1000}"; line: 1, column: 136] (through reference chain: org.apache.pulsar.client.impl.conf.ReaderConfigurationData["readername"])
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1701)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1679)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:330)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3548)
    at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3516)
    at org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.loadData(ConfigurationDataUtils.java:67)
    ... 3 more

To Reproduce

my version is 1.12.4.6

Expected behavior I think the documentation is not accurate in this part Configuration parameters. The right config format is like this

  'properties.pulsar.reader.reader-name'='Flink-reader',
  'properties.pulsar.reader.subscription-role-prefix'='flink-pulsar-',

After checking the source code, I found the javadoc of class org.apache.flink.table.descriptors.DescriptorProperties

 * Key-names should be
 * hierarchical and lower case. Use "-" instead of dots or camel case.
 * <p>Properties with key normalization enabled contain only lower-case keys.

if you use a upper case,it will transform key into lower case.

    private void put(String key, String value) {
        if (properties.containsKey(key)) {
            throw new ValidationException("Property already present: " + key);
        }
        if (normalizeKeys) {
            properties.put(key.toLowerCase(), value);
        } else {
            properties.put(key, value);
        }
    }

This makes it impossible for ConfigurationDataUtils to read configurations into class org.apache.pulsar.client.impl.conf.ReaderConfigurationData using ObjectMapper.

For those who are also experiencing this problem By the way, how are the keys separated by '-' converted to Camal case? It hadppends inorg.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions

    public static Properties getPulsarProperties(Map<String, String> tableOptions) {
        final Properties pulsarProperties = new Properties();

        if (hasPulsarClientProperties(tableOptions)) {
            tableOptions.keySet().stream()
                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
                    .forEach(key -> {
                        final String value = tableOptions.get(key);
                        String subKey = key.substring((PROPERTIES_PREFIX).length());
                        if (subKey.startsWith(PULSAR_OPTION_KEY_PREFIX)) {
                            subKey = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, subKey);
                        }
                        pulsarProperties.put(subKey, value);
                    });
        }
        pulsarProperties.computeIfAbsent(PARTITION_DISCOVERY_INTERVAL_MILLIS.key(), tableOptions::get);
        return pulsarProperties;
    }
nlu90 commented 3 years ago

@Aaronzk

This part of documentation is misleading and the code is quite discrete. I also met this problem recently, I think we need to either change the doc or update the code you mentioned to make it handles all options properly