spring-attic / spring-cloud-gcp

Integration for Google Cloud Platform APIs with Spring
Apache License 2.0
704 stars 694 forks source link

IllegalArgumentException while Consuming messages in PubSub #2629

Closed acs closed 3 years ago

acs commented 3 years ago

Describe the bug

I am using the versions:

I am not sure if it is a bug, but in this pretty basic sample the behavior is not what I expected. I am getting the error:

Caused by: java.lang.IllegalArgumentException: Failed to convert input: GenericMessage [payload=byte[46], headers={id=d2c3cafc-0289-2f50-cf6e-c966efd61e59, contentType=application/json, gcp_pubsub_original_message=PushedAcknowledgeablePubsubMessage{projectId='k8strain-301514', subscriptionName='dp-flow-execution-worker.receiver', message=data: "{\"inputEvent\":\"event\",\"operation\":\"operation\"}"
attributes {
  key: "contentType"
  value: "application/json"
}
message_id: "1943960221068228"
publish_time {
  seconds: 1611377014
  nanos: 362000000
}
}, timestamp=1611377448226}] to class ai.monom.dp_flow_execution_worker.entities.FlowOperation

when executing the SpringBoot application (FlowOperation is a POJO):

@Log
@SpringBootApplication
public class DpFlowExecutionWorkerApplication {

  public static void main(String[] args) {

    SpringApplication.run(DpFlowExecutionWorkerApplication.class, args);
  }

  @Bean
  public Consumer<FlowOperation> receiveOperations() {
    return operation -> {
      log.info(String.format("%s: %s", operation.getInputEvent(), operation.getOperation()));
    };
  }

  @Bean
  public Supplier<Flux<FlowOperation>> sendTestOperation() {
    return () -> Flux.just(new FlowOperation("event", "operation"));
  }
}

I have followed the sample https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-binder-functional-sample and the Supplier and Consumer are pretty similar.

My properties file is:

topic=dp-flow-execution-worker

spring.cloud.stream.bindings.receiveOperations-in-0.destination=${topic}
spring.cloud.stream.bindings.receiveOperations-in-0.group=receiver

spring.cloud.stream.bindings.sendTestOperation-out-0.destination=${topic}

spring.cloud.function.definition=sendTestOperation;receiveOperations
acs commented 3 years ago

If I change the consumer to:

  @Bean
  public Consumer<Message<FlowOperation>> receiveOperations() {
    return message -> {
      FlowOperation operation = message.getPayload();
      log.info(String.format("%s", operation.getOperation()));
    };
  }

then I get the error:

Caused by: java.lang.ClassCastException: class [B cannot be cast to class ai.monom.dp_flow_execution_worker.entities.FlowOperation ([B is in module java.base of loader 'bootstrap'; ai.monom.dp_flow_execution_worker.entities.FlowOperation is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @73718b93)

and it works with:

  @Bean
  public Consumer<Message<String>> receiveOperations() {
    return message -> {
      String operation = message.getPayload();
      log.info(String.format("%s", operation));
    };
  }

so it seems that the payload is a String and not the POJO FlowOperation serialized as I would expected.

acs commented 3 years ago

After some testing I have seen that the problem is that the Java object included in the messages can not have a constructor method. The failing one has FlowOperation defined as:

public class FlowOperation {

  private String inputEvent;
  private String operation;

  public FlowOperation(String inputEvent, String operation) {
    this.inputEvent = inputEvent;
    this.operation = operation;
  }

  public String getInputEvent() {
    return this.inputEvent;
  }

  public String getOperation() {
    return this.operation;
  }

}

According to the message conversion doc in Spring Cloud Streams it should be managed automatically because it is a POJO: https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_provided_messageconverters

If a change this POJO to the next one, it works:

package ai.monom.dp_flow_execution_worker.entities;

public class FlowOperation {

  private String inputEvent;
  private String operation;

  public String getInputEvent() {
    return this.inputEvent;
  }

  public String getOperation() {
    return this.operation;
  }

  public FlowOperation setOperation(String operation) {
    this.operation = operation;
    return this;
  }

}

so, in which cases the message conversion is provided automatically? Could you provide some sample in the tutorials using message converters?

Thanks!

meltsufin commented 3 years ago

@acs I think a no-arg constructor is required for the converter to work. Can you just try adding an additional no-arg constructor. In any case, it doesn't seem like a question specific to Spring Cloud GCP. cc/ @artembilan

artembilan commented 3 years ago

That's not Spring problem, that's how Jackson works when we try to deserialize that JSON into a POJO: https://stackoverflow.com/questions/30568353/how-to-de-serialize-an-immutable-object-without-default-constructor-using-object.

So, or you go with a default, no-args ctor, or you use that @JsonCreator.

meltsufin commented 3 years ago

Thanks @artembilan! Closing.

acs commented 3 years ago

That's not Spring problem, that's how Jackson works when we try to deserialize that JSON into a POJO: https://stackoverflow.com/questions/30568353/how-to-de-serialize-an-immutable-object-without-default-constructor-using-object.

Thanks @artembilan !