apache / streampipes

Apache StreamPipes - A self-service (Industrial) IoT toolbox to enable non-technical users to connect, analyze and explore IoT data streams.
https://streampipes.apache.org
Apache License 2.0
566 stars 174 forks source link

Influx Sink can only handle primitive properties #2106

Closed bossenti closed 6 months ago

bossenti commented 7 months ago

Apache StreamPipes version

dev (current development state)

Affected StreamPipes components

Processing Elements

What happened?

When trying to persist an event stream with an array as property, the influx sink throws an exception:

2023-10-30T13:25:28.623+01:00 ERROR 25805 --- [ Thread-3] >o.a.s.w.s.r.StandaloneEventSinkRuntime : RuntimeException while processing event in >org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink

java.lang.IllegalArgumentException: Expecting a positive number for fields size at org.influxdb.impl.Preconditions.checkPositiveNumber(Preconditions.java:35) at org.influxdb.dto.Point$Builder.build(Point.java:352) at org.apache.streampipes.dataexplorer.commons.influx.InfluxStore.onEvent(InfluxStore.java:176) at org.apache.streampipes.dataexplorer.commons.TimeSeriesStore.onEvent(TimeSeriesStore.java:64) at org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink.onEvent(DataLakeSink.java:82) at org.apache.streampipes.wrapper.standalone.runtime.StandaloneEventSinkRuntime.process(StandaloneEventSinkRuntime.java:55) at org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector.send(StandaloneSpInputCollector.java:56) at org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector.lambda$onEvent$0(StandaloneSpInputCollector.java:51) at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603) at org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector.onEvent(StandaloneSpInputCollector.java:51) at org.apache.streampipes.wrapper.standalone.routing.StandaloneSpInputCollector.onEvent(StandaloneSpInputCollector.java:30) at org.apache.streampipes.messaging.kafka.SpKafkaConsumer.lambda$run$0(SpKafkaConsumer.java:105) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.streampipes.messaging.kafka.SpKafkaConsumer.run(SpKafkaConsumer.java:105) at java.base/java.lang.Thread.run(Thread.java:833)

This is du to the fact that only the handling of primitive types is implemented: https://github.com/apache/streampipes/blob/a34a4dda3000a20802f99c018cd28ecd6ab16ca1/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java#L129

How to reproduce?

Persist an event stream containing an array as property

Expected behavior

Storage can handle non-primitve types as well

Additional technical information

No response

Are you willing to submit a PR?

None

muyangye commented 7 months ago

@bossenti Unfortunately Influx only allows primitive data types so I guess the original author who wrote that if statement expected no array input too. To bypass this limitation, we can for example define the following simple protocol to use strings to represent arrays: "isArray:1,val1,val2,val3,...". That being said, I don't quite agree with this work around since there may be real strings (as opposed to strings as arrays) that contain keywords such as "isArray". I think it is best to let users transform their data from arrays to other types and let them define their own protocol.

bossenti commented 7 months ago

thanks for investigating @muyangye I agree with you that this kind of workaround should be avoided.

I would propose that we change the configuration of the InfluxSink such that it can only expect primitive fields. In this case, the user is directly warned in the UI that the data stream is not suitable for being persisted in the influx storage. However, this might cause some inconveniencies but is definitely better than the current state of just ignoring these fields. To slightly improve here, we could think about providing a hint somewhere in our documentation proposing workarounds for the user.

In addition, we need to think about how this affects the automatic pipeline creation in case the user selects persist data during the adapter creation.

tenthe commented 7 months ago

Hi @muyangye thank you for initiating work on this issue. I agree we need a better solution for this. However, I am not quite sure yet what the best solution might be.

The current challenge is that simply removing properties removes the user's ability to persist them. On the other hand, using a special encoding for arrays and serializing them as strings raises concerns about the use of the data for downstream processes such as visualization.

To move forward, it would be beneficial to collectively define the user's goal when dealing with array data. I see two potential options:

Option 1: Enforce users to transform arrays into an alternative representation before storing the data. Option 2: Modify the representation of an array to a string format for storage in Influx.

And as @bossenti stated, we need to consider the side effects on other functions (such as the automatic pipeline generation)

I'm open to both options but would also appreciate any additional ideas or insights you might have.

muyangye commented 7 months ago

Hi @tenthe @bossenti thanks for your insights!

In my opinion, option 2 is better because users should definitely be able to store arrays. And I am willing to provide an implementation of array serialization/deserialization and modify downstream processes accordingly. However, this is a non-trivial task and may take some time. For now, we should let users know InfluxDB can only store primitive types when they have non-primitive types and are connecting to DataLake (such as when creating a pipeline or selecting persist data) so that they can at least expect that data to be lost.

I will first implement the reminder. Then, we should further break down the task "support array in influx sink" to multiple tasks/issues, after figuring out what downstream processes need to be modified (we can collect them in this thread). Once those are clear, we can start implementing this important enhancement.

bossenti commented 7 months ago

sounds like a great plan! Just generally, since this is a change that affects the end user directly (although only positively), I would really appreciate so see some documentation & and E2E test once the workaround is in place. That would be awesome 🙂

With respect to the reminder: Please be aware that sinks (and processing elements as well) can define a requiredStream within their declareConfig() method. Would be great if we could solve it this way. (Maybe you are already aware of it, but just to mention). In this case, we would need to check how we handle the automatic pipeline creation. Out of my thoughts, I would suggest stopping & flagging the pipeline and setting a corresponding notification within the pipeline.

If you have any problems, finding the specific functionalities, feel free to reach out 🙂 Or, of course, if you have any other ideas, I'm happy to discuss it

tenthe commented 7 months ago

Hey @muyangye,

I really like your systematic approach.

When considering the implementation's complexity, I'm uncertain if Option 1 is easier to implement due to its broader impact on components like UI and pipeline creation. It might be simpler to change only the Data Lake sink, serializing arrays to strings. This way, users won't need to modify the data stream or receive notifications. For the serialization I would suggest a simple format e.g "[1,2,3]".

The only drawback is the inability to display this data in the data explorer and dashboard, which isn't currently feasible anyway. We could explore potential solutions for this in a separate issue.

What are your thoughts on this?

muyangye commented 7 months ago

@tenthe I see your points here and I agree your concerns about the impact of Option 1 (e.g. users are forced to modify the data stream) are totally valid. But if we don't give users some notifications, wouldn't it be the same situation right now? Currently the user is unaware of the data lost so @bossenti raised this issue. If I am not mistaken, are you suggesting directly start working on Option 2?

Please let me know if I am misunderstood something, but if the worry is about limiting/decreasing usability of Option 1, I would suggest this: instead of making changes that impact usability such as modifying requiredStream or stopping the pipeline created by persist data, we could simply just add a text saying something like "NOTE: This will not affect pipeline creation but currently the visualization of non-primitive types is not supported." in the strings.en of DataLake sink and the place where persist data is in. This way, the user is aware of the limitation, can expect the data lost, and are not hindered from creating the pipeline. Meanwhile, let me work on serialization and submit a PR together with the strings.en change so that we can open separate issues only relevant to the deserialization/visualization.

What do you think?

tenthe commented 7 months ago

My suggestion was to work directly on option 2. The idea behind this was that you then don't have to make any notifications to the user, as arrays are processed by the DataLake sink as expected. I thought this should be easier to implement, as it would "only" mean changes in the InfluxStore class. More precisely, only a condition for EventPropertyList would have to be added in the onEvent method.

I thought that it would be easier to implement then an exception handling for array properties. Do you see it the same way? This was only meant as a suggestion, if you think it's easier otherwise that's totally ok for me .

muyangye commented 7 months ago

My suggestion was to work directly on option 2. The idea behind this was that you then don't have to make any notifications to the user, as arrays are processed by the DataLake sink as expected. I thought this should be easier to implement, as it would "only" mean changes in the InfluxStore class. More precisely, only a condition for EventPropertyList would have to be added in the onEvent method.

I thought that it would be easier to implement then an exception handling for array properties. Do you see it the same way? This was only meant as a suggestion, if you think it's easier otherwise that's totally ok for me .

I see, I just realized after serializing the array to string the influx store will store it which will be displayed in the dashboard (as opposed to nothing) so the user will notice a difference between the current situation. Yes I think it is definitely better. Thanks a lot for your suggestion!

bossenti commented 6 months ago

Closed by #2196