eclipse-kuksa / kuksa-databroker

A modern in-vehicle VSS (Vehicle Signal Specification) server written in RUST
https://eclipse-kuksa.github.io/kuksa-website/
Apache License 2.0
12 stars 11 forks source link

Add StreamedUpdate API #34

Closed wba2hi closed 1 month ago

wba2hi commented 1 month ago

Add support for Streamed Update API.

I know this is probably not yet in a perfect state, but I still want to trigger the first review. According to the ticket adapting the databroker-cli and databroker-providers is out-of-scope since this is only an intermediate solution and will be shortly overhauled by kuksa val v2 api anyway. I shortly tried to get it to work, but I failed with some weird error messages and incompatible steps. If this is an easy step I am still willing to do it, but I probably need a helping hand to support me there :)

However, I verified the functionality using a) ghz (https://github.com/mikehaller/kuksa-databroker-ghz/tree/main) using the following testcase:

It sends out 5 requests which 100_000 streamed data points per request

{
    "name": "Test Case 5: Kuksa API, StreamedUpdate",
    "proto": "kuksa/val/v1/val.proto",
    "insecure": true,
    "skipFirst": 1,
    "call": "kuksa.val.v1.VAL.StreamedUpdate",
    "total": 6,
    "disable-template-functions": true,
    "disable-template-data": true,
    "stream-call-count": 100000,
    "concurrency": 1,
    "data": {
        "updates": [    
            {
                "entry": {
                    "path": "Vehicle.Speed",
                    "value": {
                        "float": 100.0
                    }
                },
                "fields": [2]
            }
        ]
    }
}

using command ghz --config=testcase105_kuksa_testcase105_kuksa_streamedupdate.json --import-paths=/Users/wba2hi/git/kuksa-databroker/proto localhost:55556 > testcase105_kuksa_streamedupdate.json.output

and b) a prototypical implementation in android: https://github.com/boschglobal/kuksa-android-sdk/tree/user/wba2hi/VAL-1320

As an orientation help I used the stream_datapoints api from collector.rs

codecov[bot] commented 1 month ago

Codecov Report

Attention: Patch coverage is 83.38109% with 58 lines in your changes are missing coverage. Please review.

Project coverage is 49.40%. Comparing base (82513ea) to head (a04912f).

Files Patch % Lines
databroker/src/grpc/kuksa_val_v1/val.rs 91.16% 28 Missing :warning:
databroker-cli/src/sdv_cli.rs 0.00% 17 Missing :warning:
databroker-cli/src/kuksa_cli.rs 0.00% 8 Missing :warning:
databroker/src/grpc/sdv_databroker_v1/collector.rs 0.00% 5 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #34 +/- ## ========================================== + Coverage 48.57% 49.40% +0.82% ========================================== Files 31 31 Lines 10866 11285 +419 ========================================== + Hits 5278 5575 +297 - Misses 5588 5710 +122 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

SebastianSchildt commented 1 month ago

@morgenroth this might fit your use case better than sdv1 API, and once goes to mainline, might stay around a bit longer

@wba2hi afaik @morgenroth requires actuator streaming, so I guess the "field" needs to be set differently in your example?

wba2hi commented 1 month ago

@morgenroth this might fit your use case better than sdv1 API, and once goes to mainline, might stay around a bit longer

@wba2hi afaik @morgenroth requires actuator streaming, so I guess the "field" needs to be set differently in your example?

Yes, in case of setting an actuator the call would look like the following:

val datapoint = Datapoint.newBuilder()
    .setBool(true)
    .build()

val dataEntry = DataEntry.newBuilder()
    .setPath("Vehicle.Cabin.Door.Row1.DriverSide.IsOpen")
    .setValue(datapoint)

val entryUpdate = EntryUpdate.newBuilder()
    .addFields(Field.FIELD_ACTUATOR_TARGET)
    .setEntry(dataEntry)

val streamedUpdateRequest = StreamedUpdateRequest.newBuilder()
    .addUpdates(entryUpdate)
    .build()

streamingUpdate?.onNext(streamedUpdateRequest)

I can change it, but it's basically just a short proof of concept - the approach how it's implemented in the test app is not even good and not meant to be merged in this way, because we just start sending the request once we are connected to the databroker without manual interaction and also the delay of 1 second is also just introduced so we can see how the value changes between consecutive requests :D