risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.8k stars 564 forks source link

unsupported JSONB values in elasticsearch sink #13079

Closed docteurklein closed 10 months ago

docteurklein commented 11 months ago

Describe the bug

No response

Error message/log

2023-10-26 08:39:51,214 ERROR [Thread-537] connector.SinkWriterStreamObserver:177 - sink writer error:                                                                                                                                                                                                                                                                                                                                                        
risingwave-demo-rw-1        | java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.String,java.lang.Object>` from Integer value (token `JsonToken.VALUE_NUMBER_INT`)                                                                                                                                                                                                                
risingwave-demo-rw-1        |  at [Source: (String)"3"; line: 1, column: 1]                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.write(EsSink.java:283) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.risingwave.connector.api.sink.SinkWriterV1$Adapter.write(SinkWriterV1.java:50) ~[connector-api-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.risingwave.connector.SinkWriterStreamObserver.onNext(SinkWriterStreamObserver.java:129) [risingwave-connector-service-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                          
risingwave-demo-rw-1        |   at com.risingwave.connector.JniSinkWriterHandler.runJniSinkWriterThread(JniSinkWriterHandler.java:38) [risingwave-connector-service-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.String,java.lang.Object>` from Integer value (token `JsonToken.VALUE_NUMBER_INT`)                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |  at [Source: (String)"3"; line: 1, column: 1]                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                        
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1462) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:450) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                     
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                           
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                               
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3612) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.buildDoc(EsSink.java:211) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.processUpsert(EsSink.java:245) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                              
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.writeRow(EsSink.java:263) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.write(EsSink.java:281) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   ... 3 more                                                                                                                                                                                                                                                                                                                                                                                                                                                  
risingwave-demo-rw-1        | 2023-10-26 08:39:51,215 ERROR [Thread-537] connector.JniSinkWriterResponseObserver:42 - JniSinkWriterHandler onError:                                                                                                                                                                                                                                                                                                                                         
risingwave-demo-rw-1        | java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.String,java.lang.Object>` from Integer value (token `JsonToken.VALUE_NUMBER_INT`)                                                                                                                                                                                                                
risingwave-demo-rw-1        |  at [Source: (String)"3"; line: 1, column: 1]                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.write(EsSink.java:283) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.risingwave.connector.api.sink.SinkWriterV1$Adapter.write(SinkWriterV1.java:50) ~[connector-api-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.risingwave.connector.SinkWriterStreamObserver.onNext(SinkWriterStreamObserver.java:129) [risingwave-connector-service-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                          
risingwave-demo-rw-1        |   at com.risingwave.connector.JniSinkWriterHandler.runJniSinkWriterThread(JniSinkWriterHandler.java:38) [risingwave-connector-service-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.String,java.lang.Object>` from Integer value (token `JsonToken.VALUE_NUMBER_INT`)                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |  at [Source: (String)"3"; line: 1, column: 1]                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                        
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1462) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:450) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                     
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                           
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                               
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3612) ~[jackson-databind-2.13.5.jar:2.13.5]                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.buildDoc(EsSink.java:211) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.processUpsert(EsSink.java:245) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                              
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.writeRow(EsSink.java:263) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        |   at com.risingwave.connector.EsSink.write(EsSink.java:281) ~[risingwave-sink-es-7-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   ... 3 more                                                                                                                                                                                                                                                                                                                                                                                                                                                  
risingwave-demo-rw-1        | 2023-10-26 08:39:51,215 ERROR [Thread-537] connector.SinkWriterStreamObserver:185 - sink writer finishes with error:                                                                                                                                                                                                                                                                                                                                          
risingwave-demo-rw-1        | java.lang.RuntimeException: unexpected onNext call on a finished writer stream                                                                                                                                                                                                                                                                                                                                                                                
risingwave-demo-rw-1        |   at com.risingwave.connector.SinkWriterStreamObserver.onNext(SinkWriterStreamObserver.java:64) [risingwave-connector-service-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                           
risingwave-demo-rw-1        |   at com.risingwave.connector.JniSinkWriterHandler.runJniSinkWriterThread(JniSinkWriterHandler.java:38) [risingwave-connector-service-0.1.0-SNAPSHOT.jar:?]                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        | 2023-10-26 08:39:51,215 INFO  [Thread-537] connector.JniSinkWriterHandler:47 - end of runJniSinkWriterThread                                                                                                                                                                                                                                                                                                                                                  
risingwave-demo-rw-1        | 2023-10-26T08:39:51.215534767Z  INFO risingwave_connector::sink::remote: end of jni call runJniSinkWriterThread                                                                                                                                                                                                                                                                                                                                               
risingwave-demo-rw-1        | 2023-10-26T08:39:51.327495824Z  INFO risingwave_frontend::scheduler::snapshot: unpin snapshot with RPC min_epoch=5313291236147200                                                                                                                                                                                                                                                                                                                             
risingwave-demo-rw-1        | 2023-10-26T08:39:52.214212705Z ERROR risingwave_stream::task::stream_manager: actor exit actor=1503 error=Executor error: Sink error: Internal error: channel closed                                                                                                                                                                                                                                                                                          
risingwave-demo-rw-1        |   backtrace of `StreamExecutorError`:                                                                                                                                                                                                                                                                                                                                                                                                                         
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        | 2023-10-26T08:39:52.214253313Z ERROR risingwave_compute::rpc::service::stream_service: failed to collect barrier: Actor 1503 exit unexpectedly: Executor error: Sink error: Internal error: channel closed                                                                                                                                                                                                                                                    
risingwave-demo-rw-1        |   backtrace of `StreamExecutorError`:                                                                                                                                                                                                                                                                                                                                                                                                                         
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        |   backtrace of `StreamError`:                                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        | 2023-10-26T08:39:52.2143716Z  WARN risingwave_meta::barrier: Failed to complete epoch 5313291302535168: Rpc error: gRPC error (Internal error): Actor 1503 exit unexpectedly: Executor error: Sink error: Internal error: channel closed                                                                                                                                                                                                                      
risingwave-demo-rw-1        |   backtrace of `StreamExecutorError`:                                                                                                                                                                                                                                                                                                                                                                                                                         
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        |   backtrace of `StreamError`:                                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        |   backtrace of `MetaError`:                                                                                                                                                                                                                                                                                                                                                                                                                                   
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        | 2023-10-26T08:39:52.214495865Z  INFO failure_recovery{err=Rpc error: gRPC error (Internal error): Actor 1503 exit unexpectedly: Executor error: Sink error: Internal error: channel closed                                                                                                                                                                                                                                                                    
risingwave-demo-rw-1        |   backtrace of `StreamExecutorError`:                                                                                                                                                                                                                                                                                                                                                                                                                         
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        |   backtrace of `StreamError`:                                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        | disabled backtrace prev_epoch=5313291236147200}: risingwave_meta::barrier::recovery: recovery start!                                                                                                                                                                                                                                                                                                                                                          
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216099301Z  INFO risingwave_meta::manager::sink_coordination::manager: sink manager worker start cleaning up                                                                                                                                                                                                                                                                                                                              
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216104999Z  INFO risingwave_meta::manager::sink_coordination::manager: sink manager worker finished cleaning up                                                                                                                                                                                                                                                                                                                           
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216108745Z  INFO failure_recovery{err=Rpc error: gRPC error (Internal error): Actor 1503 exit unexpectedly: Executor error: Sink error: Internal error: channel closed                                                                                                                                                                                                                                                                    
risingwave-demo-rw-1        |   backtrace of `StreamExecutorError`:                                                                                                                                                                                                                                                                                                                                                                                                                         
risingwave-demo-rw-1        | disabled backtrace                                                                                                                                                                                                                                                                                                                                                                                                                                            
risingwave-demo-rw-1        |   backtrace of `StreamError`:                                                                                                                                                                                                                                                                                                                                                                                                                                 
risingwave-demo-rw-1        | disabled backtrace prev_epoch=5313291236147200}: risingwave_meta::manager::sink_coordination::manager: successfully stop coordinator: None                                                                      
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216227826Z ERROR process_query_msg_one_stmt{session_id=16 stmt=CREATE SINK product_to_es FROM pim1.product_value WITH (connector = 'elasticsearch', index = 'product', url = 'http://es:9200', delimiter = '-')}: risingwave_frontend::session: failed to handle sql:                                                                                                                                                                     
risingwave-demo-rw-1        | CREATE SINK product_to_es FROM pim1.product_value WITH (connector = 'elasticsearch', index = 'product', url = 'http://es:9200', delimiter = '-'):                                                               
risingwave-demo-rw-1        | internal error: failed to finish command: channel closed                                 
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216240932Z ERROR pgwire::pg_protocol: error when process message error=QueryError: internal error: failed to finish command: channel closed                                                 
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216269994Z  WARN risingwave_common::metrics::guarded_metrics: err when delete metrics of "connector_sink_rows_received" of labels ["elasticsearch", "1275"]. Err Msg("missing label values [\"elasticsearch\", \"1275\"]")                                                                                                                                                                                                                
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216334225Z  WARN risingwave_common::metrics::guarded_metrics: err when delete metrics of "connector_sink_rows_received" of labels ["elasticsearch", "1275"]. Err Msg("missing label values [\"elasticsearch\", \"1275\"]")                                                                                                                                                                                                                
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216396122Z  WARN risingwave_common::metrics::guarded_metrics: err when delete metrics of "connector_sink_rows_received" of labels ["elasticsearch", "1275"]. Err Msg("missing label values [\"elasticsearch\", \"1275\"]")                                                                                                                                                                                                                
risingwave-demo-rw-1        | 2023-10-26 08:39:52,216 INFO  [Thread-539] connector.SinkWriterStreamObserver:191 - sink writer completed                                                                                                       
risingwave-demo-rw-1        | 2023-10-26 08:39:52,216 INFO  [Thread-546] connector.SinkWriterStreamObserver:191 - sink writer completed                                                                                                       
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216459624Z  WARN risingwave_common::metrics::guarded_metrics: err when delete metrics of "connector_sink_rows_received" of labels ["jdbc", "1263"]. Err Msg("missing label values [\"jdbc\", \"1263\"]")                                                                                                                                                                                                                                  
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216463554Z  WARN risingwave_common::metrics::guarded_metrics: err when delete metrics of "connector_sink_rows_received" of labels ["jdbc", "1263"]. Err Msg("missing label values [\"jdbc\", \"1263\"]")                                                                                                                                                                                                                                  
risingwave-demo-rw-1        | 2023-10-26T08:39:52.216499808Z  WARN risingwave_common::metrics::guarded_metrics: err when delete metrics of "connector_sink_rows_received" of labels ["elasticsearch", "1275"]. Err Msg("missing label values [\"elasticsearch\", \"1275\"]")

To Reproduce

when I insert a jsonb value like 4 from a mysql source:

update pim_catalog_product set raw_values = 
      JSON_MERGE_PATCH(raw_values, '{"desc3": {"<all_channels>": {"<all_locales>": 4}}}')
    where id = 1;

Expected behavior

ES sink should accept any kind of jsonb value

How did you deploy RisingWave?

No response

The version of RisingWave

PostgreSQL 9.5-RisingWave-1.3.0-alpha (bb2319e460cac272803837f39ac70b1c40268b9b)

Additional context

No response

hzxa21 commented 11 months ago

@docteurklein Would you mind sharing your SQL query for CREATE SINK?

docteurklein commented 11 months ago

Sure, here it is (copied from the log above):

CREATE SINK product_to_es FROM pim1.product_value WITH (connector = 'elasticsearch', index = 'product', url = 'http://es:9200', delimiter = '-');
docteurklein commented 11 months ago

it works if I cast my column to text at the last minute: https://github.com/docteurklein/risingwave-demo/blob/main/rw/product.sql#L47C95-L47C99