dora-rs / dora

DORA (Dataflow-Oriented Robotic Application) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed dataflow capabilities. Applications are modeled as directed graphs, also referred to as pipelines.
https://dora-rs.ai
Apache License 2.0
1.36k stars 68 forks source link

How a DORA subscriber node simultaneously classifies and processes four types of data when they receive them #332

Open Latitude9527 opened 11 months ago

Latitude9527 commented 11 months ago

I'm currently using the if statement, which is processed by judging the data flow character name, but it doesn't seem to be working, not working the way I envisioned, here is part of my code

 def on_input
        self,
        dora_input: dict,
        send_output: Callable[[str, bytes], None],
    ):

        if "color_image" == dora_input["id"]:
            self.color_frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
            )

        if "depth_image" == dora_input["id"]:
            self.depth_frame = np.frombuffer(dora_input['value'].to_numpy(),
                                             dtype=np.uint8).reshape((480, 640))

        if "infra1_image" == dora_input["id"]:
            self.infra1_frame = np.frombuffer(dora_input['value'].to_numpy(),
                                              dtype=np.uint8).reshape((480, 640))

        if "infra2_image" == dora_input["id"]:
            self.infra2_frame = np.frombuffer(dora_input['value'].to_numpy(),
                                              dtype=np.uint8).reshape((480, 640))

        #彩色数据流
        resized_color_image = np.ascontiguousarray(self.color_frame, np.uint8)
        writer.write(resized_color_image)
        resized_color_image = cv2.resize(resized_color_image, (800, 600))

        # 深度数据流
        resized_depth_image = (self.depth_frame.astype(np.uint16) * (65535 // 255))
        resized_depth_image = cv2.applyColorMap(cv2.convertScaleAbs(resized_depth_image, alpha=0.03), cv2.COLORMAP_JET)
        writer.write(resized_depth_image)
        resized_depth_image = cv2.resize(resized_depth_image, (800, 600))

        #左红外数据流
        resized_infra1_image = (self.infra1_frame.astype(np.uint16) * (65535 // 255))
        writer.write(resized_infra1_image)
        resized_infra1_image = cv2.resize(resized_infra1_image, (480, 640))

        # 右红外数据流
        resized_infra2_image = (self.infra2_frame.astype(np.uint16) * (65535 // 255))
        writer.write(resized_infra2_image)
        resized_infra2_image = cv2.resize(resized_infra2_image, (480, 640))

        # top_row = np.hstack((resized_color_image, resized_depth_image))
        # bottom_row = np.hstack((resized_infra1_image, resized_infra2_image))
        # combined_frame = np.vstack((top_row, bottom_row))
        if not NO_DISPLAY:
            cv2.imshow("D435i_image", resized_color_image)
            cv2.waitKey(1)
        self.last_time = time.time()

        return DoraStatus.CONTINUE
haixuanTao commented 11 months ago

Can you share your dataflow description yaml?

Latitude9527 commented 11 months ago

Can you share your dataflow description yaml? of course, my codes in dataflow description yaml are as follows:


nodes:
- id: webcam_D435i
operator:
python: ../webcam_D435i.py
inputs:
tick: dora/timer/millis/50
outputs:
- color_image
- depth_image
- infra1_image
- infra2_image
- imu_data
- depth_to_color_extrinsics
env:
  DEVICE_INDEX: 0
haixuanTao commented 11 months ago

What is the issue you're facing?

Latitude9527 commented 11 months ago

I want to receive four data streams and visualize them at the same time.When I received four types of data for visualization testing one by one, I found that color images could be visualized, while depth images could not accept data. However, it works again when I subscribe only to deep data and visualize with a single node

haixuanTao commented 11 months ago

I was not able to reproduce your error with a basic webcam operator.

My dataflow

nodes:
  - id: webcam
    operator:
      python: webcam.py
      inputs:
        tick: dora/timer/millis/50
      outputs:
        - image_1
        - image_2
        - image_3

  - id: object_detection
    operator:
      python: object_detection.py
      inputs:
        image: webcam/image_1
      outputs:
        - bbox

  - id: plot
    operator:
      python: plot.py
      inputs:
        image_1: webcam/image_1
        image_2: webcam/image_2
        image_3: webcam/image_3
        bbox: object_detection/bbox

My webcam operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
from typing import Callable, Optional

import os
import cv2
import numpy as np
import pyarrow as pa

from dora import DoraStatus

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))

font = cv2.FONT_HERSHEY_SIMPLEX

class Operator:
    """
    Sending image from webcam to the dataflow
    """

    def __init__(self):
        self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
        self.start_time = time.time()
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)

    def on_event(
        self,
        dora_event: str,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        match dora_event["type"]:
            case "INPUT":
                ret, frame = self.video_capture.read()
                if ret:
                    frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))

                ## Push an error image in case the camera is not available.
                else:
                    frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                    cv2.putText(
                        frame,
                        "No Webcam was found at index %d" % (CAMERA_INDEX),
                        (int(30), int(30)),
                        font,
                        0.75,
                        (255, 255, 255),
                        2,
                        1,
                    )

                send_output(
                    "image_1",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_2",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_3",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
            case "STOP":
                print("received stop")
            case other:
                print("received unexpected event:", other)

        if time.time() - self.start_time < 20:
            return DoraStatus.CONTINUE
        else:
            return DoraStatus.STOP

    def __del__(self):
        self.video_capture.release()

My plot operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from typing import Callable, Optional

import cv2
import numpy as np
import pyarrow as pa
from utils import LABELS

from dora import DoraStatus

pa.array([])

CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

font = cv2.FONT_HERSHEY_SIMPLEX

class Operator:
    """
    Plot image and bounding box
    """

    def __init__(self):
        self.image_1 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_2 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_3 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.bboxs = []
        self.bounding_box_messages = 0
        self.image_messages = 0

    def on_event(
        self,
        dora_event: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        if dora_event["type"] == "INPUT":
            return self.on_input(dora_event, send_output)
        return DoraStatus.CONTINUE

    def on_input(
        self,
        dora_input: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        """
        Put image and bounding box on cv2 window.

        Args:
            dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
            dora_input["data"] (bytes): Bytes message of the dora_input
            send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
                Function for sending output to the dataflow:
                - First argument is the `output_id`
                - Second argument is the data as either bytes or `pa.UInt8Array`
                - Third argument is dora metadata dict
                e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
        """
        if dora_input["id"] == "image_1":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_1 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_2":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_2 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_3":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_3 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")

        elif dora_input["id"] == "bbox" and len(self.image_1) != 0:
            bboxs = dora_input["value"].to_numpy().view(np.float32)
            self.bboxs = np.reshape(bboxs, (-1, 6))

            self.bounding_box_messages += 1
            print("received " + str(self.bounding_box_messages) + " bounding boxes")

        for bbox in self.bboxs:
            [
                min_x,
                min_y,
                max_x,
                max_y,
                confidence,
                label,
            ] = bbox
            cv2.rectangle(
                self.image_1,
                (int(min_x), int(min_y)),
                (int(max_x), int(max_y)),
                (0, 255, 0),
                2,
            )

            cv2.putText(
                self.image_1,
                LABELS[int(label)] + f", {confidence:0.2f}",
                (int(max_x), int(max_y)),
                font,
                0.75,
                (0, 255, 0),
                2,
                1,
            )

        if CI != "true":
            cv2.imshow("frame_1", self.image_1)
            cv2.imshow("frame_2", self.image_2)
            cv2.imshow("frame_3", self.image_3)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                return DoraStatus.STOP

        return DoraStatus.CONTINUE

    def __del__(self):
        cv2.destroyAllWindows()

This works without issue.

Latitude9527 commented 11 months ago

I was not able to reproduce your error with a basic webcam operator.

My dataflow

nodes:
  - id: webcam
    operator:
      python: webcam.py
      inputs:
        tick: dora/timer/millis/50
      outputs:
        - image_1
        - image_2
        - image_3

  - id: object_detection
    operator:
      python: object_detection.py
      inputs:
        image: webcam/image_1
      outputs:
        - bbox

  - id: plot
    operator:
      python: plot.py
      inputs:
        image_1: webcam/image_1
        image_2: webcam/image_2
        image_3: webcam/image_3
        bbox: object_detection/bbox

My webcam operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
from typing import Callable, Optional

import os
import cv2
import numpy as np
import pyarrow as pa

from dora import DoraStatus

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))

font = cv2.FONT_HERSHEY_SIMPLEX

class Operator:
    """
    Sending image from webcam to the dataflow
    """

    def __init__(self):
        self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
        self.start_time = time.time()
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)

    def on_event(
        self,
        dora_event: str,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        match dora_event["type"]:
            case "INPUT":
                ret, frame = self.video_capture.read()
                if ret:
                    frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))

                ## Push an error image in case the camera is not available.
                else:
                    frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                    cv2.putText(
                        frame,
                        "No Webcam was found at index %d" % (CAMERA_INDEX),
                        (int(30), int(30)),
                        font,
                        0.75,
                        (255, 255, 255),
                        2,
                        1,
                    )

                send_output(
                    "image_1",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_2",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_3",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
            case "STOP":
                print("received stop")
            case other:
                print("received unexpected event:", other)

        if time.time() - self.start_time < 20:
            return DoraStatus.CONTINUE
        else:
            return DoraStatus.STOP

    def __del__(self):
        self.video_capture.release()

My plot operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from typing import Callable, Optional

import cv2
import numpy as np
import pyarrow as pa
from utils import LABELS

from dora import DoraStatus

pa.array([])

CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

font = cv2.FONT_HERSHEY_SIMPLEX

class Operator:
    """
    Plot image and bounding box
    """

    def __init__(self):
        self.image_1 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_2 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_3 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.bboxs = []
        self.bounding_box_messages = 0
        self.image_messages = 0

    def on_event(
        self,
        dora_event: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        if dora_event["type"] == "INPUT":
            return self.on_input(dora_event, send_output)
        return DoraStatus.CONTINUE

    def on_input(
        self,
        dora_input: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        """
        Put image and bounding box on cv2 window.

        Args:
            dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
            dora_input["data"] (bytes): Bytes message of the dora_input
            send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
                Function for sending output to the dataflow:
                - First argument is the `output_id`
                - Second argument is the data as either bytes or `pa.UInt8Array`
                - Third argument is dora metadata dict
                e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
        """
        if dora_input["id"] == "image_1":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_1 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_2":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_2 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_3":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_3 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")

        elif dora_input["id"] == "bbox" and len(self.image_1) != 0:
            bboxs = dora_input["value"].to_numpy().view(np.float32)
            self.bboxs = np.reshape(bboxs, (-1, 6))

            self.bounding_box_messages += 1
            print("received " + str(self.bounding_box_messages) + " bounding boxes")

        for bbox in self.bboxs:
            [
                min_x,
                min_y,
                max_x,
                max_y,
                confidence,
                label,
            ] = bbox
            cv2.rectangle(
                self.image_1,
                (int(min_x), int(min_y)),
                (int(max_x), int(max_y)),
                (0, 255, 0),
                2,
            )

            cv2.putText(
                self.image_1,
                LABELS[int(label)] + f", {confidence:0.2f}",
                (int(max_x), int(max_y)),
                font,
                0.75,
                (0, 255, 0),
                2,
                1,
            )

        if CI != "true":
            cv2.imshow("frame_1", self.image_1)
            cv2.imshow("frame_2", self.image_2)
            cv2.imshow("frame_3", self.image_3)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                return DoraStatus.STOP

        return DoraStatus.CONTINUE

    def __del__(self):
        cv2.destroyAllWindows()

This works without issue.

ok,thanks,i will have a try