Seeed-Studio / SSCMA-Micro

A cross-platform framework that deploys and applies SSCMA models to microcontrol devices
https://seeed-studio.github.io/SSCMA-Micro/
MIT License
24 stars 13 forks source link

Possible issue with base64 encoding #81

Open gustav-vinnter opened 1 week ago

gustav-vinnter commented 1 week ago

Describe the bug

I tried to use "AT+INVOKE=1,0,0" to capture images from a python script similar to how it is done on https://sensecraft.seeed.cc/ai/. I get error in the base64 encoded image. I believe it comes from a bug in el_base64_encode() in the second part there is a mixup of i and j. I think it should be for (j = 0; j < i + 1; j++) *out++ = constants::BASE64_CHARS_TABLE[char_array_4[j]];

Adding null terminating at the end of the function is probably good too but not sure if it creates any issues without it. *out = '\0';

Environment

Environment you use when bug appears: Grove Vision AI with latest available firmware connected to Ubuntu running a python script.

  1. Compiler Version
  2. Compiler Commands
  3. SSCMA-Micro Version
  4. Code you run
  5. The detailed error

Additional context

The python script

import serial
import time
import threading
import base64
import io
from PIL import Image, ImageTk
import queue
import json
import tkinter as tk
from tkinter import ttk
import argparse

class SSCMAClient:
    def __init__(self, port, baudrate=921600, timeout=1):
        """
        Initialize the serial connection and start the listener thread.
        """
        try:
            self.ser = serial.Serial(
                port=port,
                baudrate=baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=timeout
            )
            time.sleep(2)  # Wait for the connection to initialize
            print(f"Connected to {port} at {baudrate} baud.")
        except serial.SerialException as e:
            print(f"Failed to connect to {port}: {e}")
            raise e

        self.lock = threading.Lock()
        self.listening = True
        self.listener_thread = threading.Thread(target=self.listen, daemon=True)
        self.listener_thread.start()
        self.buffer = ""

        # Queues for thread-safe communication
        self.image_queue = queue.Queue()
        self.bbox_queue = queue.Queue()

    def send_command(self, command):
        """
        Send an AT command to the device.
        """
        full_command = f"{command}\r\n"  # Ensure commands are terminated with CRLF
        print(f"Sending command: {full_command.strip()}")
        with self.lock:
            try:
                self.ser.flushInput()   # Flush input buffer to remove any residual data
                self.ser.flushOutput()  # Flush output buffer
                self.buffer = ""
                self.ser.write(full_command.encode())
                print(f"Command '{command}' sent successfully.")
            except serial.SerialException as e:
                print(f"Failed to send command '{command}': {e}")

    def listen(self):
        """
        Listen for incoming data from the device.
        """
        self.buffer = ""
        decoder = json.JSONDecoder()
        while self.listening:
            try:
                if self.ser.in_waiting:
                    data = self.ser.read(self.ser.in_waiting).decode(errors='ignore')
                    print(f"Raw data received: {data.strip()}")
                    self.buffer += data
                    while self.buffer:
                        self.buffer = self.buffer.lstrip()  # Remove leading whitespace
                        if not self.buffer:
                            break

                        if self.buffer.strip().endswith('"is_ready": 1}}'):
                            self.buffer = ""
                            print("End of startup sequence")
                            break

                        if not self.buffer.strip().startswith('{'):
                            print("####### Ignore not complete data" + self.buffer)
                            self.buffer = ""
                            break

                        try:
                            obj, index = decoder.raw_decode(self.buffer)
                            self.handle_json(obj)
                            self.buffer = self.buffer[index:]
                        except json.JSONDecodeError as je:
                            # If no complete JSON object can be decoded, wait for more data
                            print(f"----> JSON EX:{je}")
                            print("!!!!!" + self.buffer + "!!!!!!!!")
                            break
                time.sleep(0.1)
            except serial.SerialException as e:
                print(f"Serial exception: {e}")
                self.listening = False
            except Exception as e:
                print(f"Unexpected error in listen thread: {e}")
                self.listening = False

    def handle_json(self, json_data):
        """
        Handle a parsed JSON object.
        """
        print(f"Handling JSON data: {json_data}")
        msg_type = json_data.get("type")
        name = json_data.get("name")

        if msg_type == 1 and name == "INVOKE":
            data = json_data.get("data", {})
            image_b64 = data.get("image")
            boxes = data.get("boxes", [])

            if image_b64:
                image = self.decode_image(image_b64)
                if image:
                    self.image_queue.put(image)
            if boxes:
                self.bbox_queue.put(boxes)
        elif msg_type == 0 and name == "MODEL":
            print(f"Model Information: {json_data.get('data')}")
        elif msg_type == 0 and name == "INVOKE":
            print(f"Invoke Information: {json_data.get('data')}")
        else:
            print(f"Unrecognized or unsupported message: {json_data}")

    def decode_image(self, img_data):
        """
        Decode base64 image data to a PIL Image.
        """
        try:
            # Remove any whitespace or newlines within the Base64 string
            img_data_clean = ''.join(img_data.split())
            img_bytes = base64.b64decode(img_data_clean)
            image = Image.open(io.BytesIO(img_bytes))
            print("Image decoded successfully.")
            return image
        except Exception as e:
            print(f"Failed to decode image: {e}")
            return None

    def parse_bboxes(self, bbox_data):
        """
        Parse bounding boxes from the received data.
        Expected format: "x1,y1,x2,y2; x1,y1,x2,y2; ..."
        """
        try:
            bboxes = []
            boxes = bbox_data.split(';')
            for box in boxes:
                coords = box.strip().split(',')
                if len(coords) == 4:
                    x1, y1, x2, y2 = map(int, coords)
                    bboxes.append((x1, y1, x2, y2))
            print(f"Parsed bounding boxes: {bboxes}")
            return bboxes
        except Exception as e:
            print(f"Failed to parse bounding boxes: {e}")
            return []

    def close(self):
        """
        Close the serial connection and stop listening.
        """
        self.listening = False
        self.listener_thread.join()
        self.ser.close()
        print("Serial connection closed.")

class SSCMAApp:
    def __init__(self, root, client):
        self.root = root
        self.client = client
        self.root.title("Grove Vision AI v2 Client")
        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)

        # Create UI elements
        self.create_widgets()

        # Initialize image and bounding boxes
        self.current_image = None
        self.bounding_boxes = []

        # Start polling queues
        self.poll_queues()

    def create_widgets(self):
        # Create a frame for the image
        self.image_frame = ttk.Frame(self.root)
        self.image_frame.pack(padx=10, pady=10)

        # Canvas to display image and bounding boxes
        self.canvas = tk.Canvas(self.image_frame, width=640, height=480, bg='grey')
        self.canvas.pack()

        # Create a frame for buttons
        self.button_frame = ttk.Frame(self.root)
        self.button_frame.pack(pady=10)

        # Refresh button
        self.refresh_button = ttk.Button(
            self.button_frame, text="Refresh Image", command=self.refresh_image
        )
        self.refresh_button.pack()

    def refresh_image(self):
        """
        Send commands to request image and bounding boxes from the device.
        """
        # Send the AT+INVOKE command to get image and bounding boxes
        self.client.send_command("AT+INVOKE=1,0,0")

    def poll_queues(self):
        """
        Periodically check the image and bbox queues for new data.
        """
        # Check for new images
        try:
            while True:
                image = self.client.image_queue.get_nowait()
                self.display_image(image)
        except queue.Empty:
            pass

        # Check for new bounding boxes
        try:
            while True:
                bboxes = self.client.bbox_queue.get_nowait()
                self.display_bboxes(bboxes)
        except queue.Empty:
            pass

        # Schedule the next poll
        self.root.after(100, self.poll_queues)

    def display_image(self, image):
        """
        Display the image on the canvas.
        """
        try:
            self.current_image = image.copy()
            # Define desired max size
            max_width, max_height = 640, 480
            img_width, img_height = self.current_image.size
            scale = min(max_width / img_width, max_height / img_height, 1)
            new_size = (int(img_width * scale), int(img_height * scale))
            self.current_image = self.current_image.resize(new_size, Image.ANTIALIAS)
            self.photo = ImageTk.PhotoImage(self.current_image)
            self.canvas.config(width=new_size[0], height=new_size[1])
            self.canvas.create_image(0, 0, anchor=tk.NW, image=self.photo)
            print("Image displayed and scaled.")
        except:
            print("Failed to show image!")
            pass

    def display_bboxes(self, bboxes):
        """
        Draw bounding boxes on the canvas.
        """
        self.bounding_boxes = bboxes
        self.canvas.delete("bbox")  # Remove existing bounding boxes

        if self.current_image:
            for bbox in bboxes:
                x1, y1, x2, y2 = bbox
                self.canvas.create_rectangle(
                    x1, y1, x2, y2, outline="red", width=2, tags="bbox"
                )
            print("Bounding boxes drawn.")

    def on_closing(self):
        """
        Handle the window closing event.
        """
        print("Closing application...")
        self.client.close()
        self.root.destroy()

def main():
    parser = argparse.ArgumentParser(description="Grove Vision AI v2 Python Client with Tkinter GUI")
    parser.add_argument(
        '--port', type=str, required=True,
        help='Serial port (e.g., COM3 or /dev/ttyUSB0)'
    )
    args = parser.parse_args()

    # Initialize the serial client
    try:
        client = SSCMAClient(port=args.port)
    except serial.SerialException:
        return

    # Initialize Tkinter
    root = tk.Tk()
    app = SSCMAApp(root, client)

    # Start the Tkinter main loop
    try:
        root.mainloop()
    except KeyboardInterrupt:
        print("Exiting...")
    finally:
        client.close()

if __name__ == "__main__":
    main()
LynnL4 commented 6 days ago

Hi, thanks for the feedback, already fixed the issue, will release new firmware after testing, thanks. https://github.com/Seeed-Studio/SSCMA-Micro/commit/7b52b921d293f4501a0b9d400f4b8be1943c157b