IBMStreams / streamsx.kafka

Repository for integration with Apache Kafka
https://ibmstreams.github.io/streamsx.kafka/
Apache License 2.0
13 stars 9 forks source link

KafkaProducer: message or key attribute with underline causes error at context checker #208

Closed markheger closed 4 years ago

markheger commented 4 years ago
  ( ) as KafkaProducer_RgfvhNzQkE = com.ibm.streamsx.kafka::KafkaProducer  ( KafkaConsumer_Iv3fWTJ7W0k as __spl_0047b0a770aeaba622ba6ffe42d5f0ef)
  {
    param
      messageAttribute: event_message;
      propertiesFile: 'etc/producer-spv3bq0u.properties';
      topic: 'biology';
  config
    streamViewability: true;
  }

causes at runtime

ERROR #splapptrc,J[0],P[0],KafkaProducer_RgfvhNzQkE M[?:com.ibm.streams.operator.internal.compile.OperatorContextCheckerImpl.setInvalidContext:-1]  - CDIST2160E Message has not been specified. Either the 'messageAttribute' parameter must be specified or the input schema must contain an attribute named "message".
markheger commented 4 years ago

AbstractKafkaProducerOperator.parseFQAttributeName is causing this issue since there is a split with char '_' and therefore the name used to check if the attribute exists is "event" (in the sample above).

ghost commented 4 years ago

The function AbstractKafkaProducerOperator.parseFQAttributeName is totally broken.

Its purpose is being a workround for the broken OperatorContext.getParameterValues(String) Java Operator API call, which is supposed to return InputPortName.AttributeName for an attribute parameter, but actaully returns the C++ expression iport$0.get_AttributeName() instead (Streams 4.3).

The implementation is broken because it would not work correctly if the Java Operator API would show the correct behavior (correct == as documented). That's why this obscure parseFQAttributeName should be removed. The context check checkAttributes, which uses the parseFQAttributeName function, is performed at Runtime. That's why, the attribute checks (presence, types, etc) can be moved to the operator's initialize function without any negative impact to the user. initialize has access to the member variables that take the parameter values via annotation.

markheger commented 4 years ago

Doing the check in initializeis a good option since many operators have the check there.

ghost commented 4 years ago

The exact impact of this issue is:

ghost commented 4 years ago

Resolved with release 3.0.4