dora-rs / dora

DORA (Dataflow-Oriented Robotic Architecture) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed dataflow capabilities. Applications are modeled as directed graphs, also referred to as pipelines.
https://dora-rs.ai
Apache License 2.0
1.47k stars 76 forks source link

local network communication #619

Open KyloRen1 opened 1 month ago

KyloRen1 commented 1 month ago

Hello. I am trying to make a local network communication between my two computers. I tried with python socket, but I saw in documentation that dora-rs supports TCP. After some time searching I didn't find any examples/issues/discussions on how to do it. Where can I check it?

Additionally, could you please update Discord invitation link. Thank you

Hennzau commented 1 month ago

Hi! There is a multiple-daemon example, and you can see the release note about multiple machines: https://github.com/dora-rs/dora/releases/tag/v0.3.5

KyloRen1 commented 4 weeks ago

Thank you very much. I followed the minimal example, reading frames from camera on raspberry pi and plotting it on my computer. I have the following error

2024-08-15T08:48:46.065534Z  WARN dora_coordinator::listener: failed to deserialize node message

Caused by:
    missing field `inner` at line 1 column 853

on Machine A I do as in example

dora coordinator
dora daemon --machine-id A

on Machine B I do slightly differently

dora daemon --machine-id B --coordinator-addr 192.168.1.150:53290
dora start dataflow_small.yml --coordinator-addr 192.168.1.150 --coordinator-port 53290
Hennzau commented 4 weeks ago

Thank you very much. I followed the minimal example, reading frames from camera on raspberry pi and plotting it on my computer. I have the following error


2024-08-15T08:48:46.065534Z  WARN dora_coordinator::listener: failed to deserialize node message

Caused by:

    missing field `inner` at line 1 column 853

on Machine A I do as in example


dora coordinator

dora daemon --machine-id A

on Machine B I do slightly differently


dora daemon --machine-id B --coordinator-addr 192.168.1.150:53290

dora start dataflow_small.yml --coordinator-addr 192.168.1.150 --coordinator-port 53290

Hi, sorry for the error. Can you show me the dataflow YAML file, and your source code ?

KyloRen1 commented 4 weeks ago

yaml file

nodes:
  - id: webcam
    _unstable_deploy:
      machine: A
    operator:
      python: ../car/operators/webcam.py
      inputs:
        tick:
          source: dora/timer/millis/50
      outputs:
        - image

  - id: plot
    _unstable_deploy:
      machine: B
    operator:
      python: ../computer/operators/plot.py
      inputs:
        image: webcam/image

'webcam.py'

import os
import time

import cv2
import numpy as np
import pyarrow as pa

from dora import DoraStatus

class Operator:
    """
    Sending image from webcam to the dataflow
    """

    def __init__(self):
        self.CI = os.environ.get("CI")# TODO
        self.CAMERA_WIDTH = 960 # TODO
        self.CAMERA_HEIGHT = 540# TODO
        self.CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))

        self.video_capture = cv2.VideoCapture(self.CAMERA_INDEX)
        self.start_time = time.time()
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.CAMERA_WIDTH)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.CAMERA_HEIGHT)

    def on_event(self, dora_event, send_output) -> DoraStatus:
        if  dora_event["type"] == "INPUT":
            ret, frame = self.video_capture.read()
            if ret:
                frame = cv2.resize(frame, (self.CAMERA_WIDTH, self.CAMERA_HEIGHT))
            else:
                frame = np.zeros((self.CAMERA_HEIGHT, self.CAMERA_WIDTH, 3), dtype=np.uint8)
                cv2.putText(frame,
                    "No Webcam was found at index %d" % (self.CAMERA_INDEX),
                    (int(30), int(30)), cv2.FONT_HERSHEY_SIMPLEX, 0.75,
                    (255, 255, 255), 2, 1)
                if self.CI != "true":
                    return DoraStatus.CONTINUE
            send_output("image", pa.array(frame.ravel()), dora_event["metadata"])
        elif  dora_event["type"] == "STOP":
            print("received stop")
        else:
            print("received unexpected event:",  dora_event["type"])

        if time.time() - self.start_time < 10000:
            return DoraStatus.CONTINUE
        else:
            return DoraStatus.STOP

    def __del__(self):
        self.video_capture.release()

plot.py

import os
import cv2
from dora import DoraStatus

CI = os.environ.get("CI")

CAMERA_WIDTH = 960
CAMERA_HEIGHT = 540

FONT = cv2.FONT_HERSHEY_SIMPLEX

class Operator:
    """
    Plot image and bounding box
    """

    def on_event(self, dora_event, send_output):
        if dora_event["type"] == "INPUT":
            if dora_event['id'] == "image":

                image = dora_event['value'].to_numpy().reshape(
                    (CAMERA_HEIGHT, CAMERA_WIDTH, 3)).copy()

                if CI != "true":
                    cv2.imshow("frame", image)
                    if cv2.waitKey(1) & 0xFF == ord("q"):
                        return DoraStatus.STOP

        return DoraStatus.CONTINUE
Hennzau commented 4 weeks ago

Thanks. The code doesn't seem incorrect. However you're using operators and it's deprecated.

Could you try with standard nodes ? You can just use our pre-conceived nodes inside the node-hub directory.

KyloRen1 commented 4 weeks ago

Here is my updated yaml

nodes:
  - id: webcam
    _unstable_deploy:
      machine: A
    build: pip install ../dora-main/node-hub/opencv-video-capture
    path: opencv-video-capture
    inputs:
      tick: dora/timer/millis/16 # try to capture at 60fps
    outputs:
      - image # the captured image
    env:
      PATH: 0 # optional, default is 0
      IMAGE_WIDTH: 640 # optional, default is video capture width
      IMAGE_HEIGHT: 480 # optional, default is video capture height

  - id: plot
    _unstable_deploy:
      machine: B
    build: pip install ../dora-main/node-hub/opencv-plot
    path: opencv-plot
    inputs:
      image: webcam/image # image: Arrow array of size 1 containing the base image
    env:
      PLOT_WIDTH: 640 # optional, default is image input width
      PLOT_HEIGHT: 480 # optional, default is image input height

And I have the following error

Dataflow could not be validated.: paths of remote nodes must be absolute (node `webcam`)
Hennzau commented 4 weeks ago

Ok, so for remote machines you must specify absolute path to a source file. So you should remove the build procedure for the two nodes and pass the absolute path path/to/dora/node-hub/opencv-video-capture/opencv_video_capture/main.py

KyloRen1 commented 4 weeks ago

With the removed build and full path for path I have the same error as initially

WARN dora_coordinator::listener: failed to deserialize node message

Caused by:
    missing field `inner` at line 1 column 853
Hennzau commented 4 weeks ago

With the removed build and full path for path I have the same error as initially


WARN dora_coordinator::listener: failed to deserialize node message

Caused by:

    missing field `inner` at line 1 column 853

So it must be a bug with Dora. Could you tell us the exact version you're using for Dora ? (Lastest release, main branch ?) I will try to solve it soon. But I can't until end of August because I'm on vacation

KyloRen1 commented 4 weeks ago

I am using dora-cli 0.3.5, installed from dora-v0.3.5-x86_64-macOS.zip. On my third machine where I experience the same problem, I used the same version but installed with cargo (armv7). Thank you!

phil-opp commented 3 weeks ago
dora daemon --machine-id B --coordinator-addr 192.168.1.150:53290
dora start dataflow_small.yml --coordinator-addr 192.168.1.150 --coordinator-port 53290

The dora coordinator currently uses different sockets for daemon and CLI connections. Port 53290 is the default port for daemon->coordinator connections. For CLI commands, the coordinator opens a different port, defaulting to 6012. The dora start command defaults to port 6012 if no --coordinator-port argument is given, so you could try the following:

```shell
dora start dataflow_small.yml --coordinator-addr 192.168.1.150

The deserialization error occurs because the coordinator expects only daemon messages on port 53290, which have a different format.

(I don't remember exactly why we decided to use different ports, but I think it was related to network topology and firewalls. E.g. with separate ports, you can route the daemon connections through your internal network and only expose the CLI port through a public IP.)