mobilityhouse / ocpp

Python implementation of the Open Charge Point Protocol (OCPP).
MIT License
788 stars 310 forks source link

I got following error asdict() should be called on dataclass instances. #483

Closed arpantechparticle closed 11 months ago

arpantechparticle commented 1 year ago

I am using this awesome library and here is my code. I got following error asdict() should be called on dataclass instances. when calling start api

charging system file

import asyncio
import json
from dataclasses import is_dataclass
from datetime import datetime

from ocpp.charge_point import LOGGER
from ocpp.v16 import call_result, call
from ocpp.routing import on, after
from ocpp.v16 import ChargePoint
from ocpp.v16.enums import Action, RegistrationStatus, RemoteStartStopStatus
import logging

from fastapi import WebSocket, WebSocketDisconnect

from pubsub import WebSocketManager

socket_manager = WebSocketManager()

class SChargePoint(ChargePoint):

    def __init__(self, charge_point_id, connection):
        super().__init__(charge_point_id, connection)
        self.sockets = []

    def set_sockets(self, sockets):
        self.sockets = sockets

    @on(Action.BootNotification)
    def on_boot_notification(
            self, charge_point_vendor: str, charge_point_model: str, **kwargs
    ):
        return call_result.BootNotificationPayload(
            current_time=datetime.utcnow().isoformat(),
            interval=10,
            status=RegistrationStatus.accepted,
        )

    @on(Action.StartTransaction)
    def on_start_transaction(self, connector_id, id_tag, timestamp, meter_start, reservation_id):
        return call_result.StartTransactionPayload(
            id_tag_info={
                "status": 'Accepted'
            },
            transaction_id=int(1)
        )

    @on(Action.StopTransaction)
    def on_stop_transaction(self, transaction_id, id_tag, timestamp, meter_stop):
        print('on_stop_transaction transaction_id', transaction_id)
        print('on_stop_transaction transaction_id', id_tag)
        return call_result.StopTransactionPayload()

    @on(Action.MeterValues)
    def on_meter_value(self):
        return call_result.MeterValuesPayload()

    @on(Action.RemoteStartTransaction)
    async def remote_start_transaction(self, id_tag, connector_id, **kwargs):
        try:
            print('kwargs', kwargs)
            request = call.RemoteStartTransactionPayload(id_tag=id_tag, connector_id=connector_id)
            print('is data class', is_dataclass(request))
            response = await self.call(request)
            print('response.status', response.status)
            if response.status == RemoteStartStopStatus.accepted:
                print("Transaction Started!!!")

            return response
        except asyncio.CancelledError as e:
            reason = str(e)
            print(f"An error occurred in remote_start_transaction: {reason}")
        except Exception as e:
            # Handle other exceptions that may occur
            print(f"An error occurred in remote_start_transaction: {e}")
        finally:
            # Perform cleanup if necessary
            pass

    @on(Action.Heartbeat)
    def on_heartbeat(self, **kwargs):
        return call_result.HeartbeatPayload(
            current_time=datetime.utcnow().isoformat()
        )

    @on(Action.StatusNotification)
    def on_status_notification(self, **kwargs):
        return call_result.StatusNotificationPayload()

    # async def send_message(self, message: str):
    #     await self._connection.send_text(message)

    async def _send(self, message):
        LOGGER.info("%s: send %s", self.id, message)
        print('self.sockets', self.sockets)
        for socket in self.sockets:
            await socket.send_text(message)
        # await self._connection.send_text(message)

    # async def start(self, pubsub):
    #     while True:
    #         message = await self._connection._pubsub_data_reader(pubsub)
    #         LOGGER.info("%s: receive message %s", self.id, message)
    #
    #         await self.route_message(message)

async def on_connect(websocket: WebSocket, charge_point_id: str):
    """For every new charge point that connects, create a ChargePoint
    instance and start listening for messages.
    """
    # print('websocket.scope', websocket.scope)
    available_subprotocols = 'ocpp1.6'
    try:
        requested_protocols = websocket.scope.get('subprotocols')
        print('requested_protocols', requested_protocols)
        logging.info("Protocols Matched: %s", requested_protocols)
    except KeyError:
        logging.error("Client hasn't requested any Subprotocol. Closing Connection")
        return await websocket.close()
        # if available_subprotocols in requested_protocols: TODO: enable this

        # if requested_protocols:
        #     logging.info("Protocols Matched: %s", requested_protocols)
        # else:
        #     # In the websockets lib if no subprotocols are supported by the
        #     # client and the server, it proceeds without a subprotocol,
        #     # so we have to manually close the connection.
        #     logging.warning('Protocols Mismatched | Expected Subprotocols: %s,'
        #                     ' but client supports  %s | Closing connection')
        #     return await websocket.close()

    logging.info("Protocols Matched: %s", requested_protocols)
    cp = SChargePoint(charge_point_id, socket_manager)
    await socket_manager.add_user_to_room(charge_point_id, websocket, cp)

    try:
        while True:
            data = await websocket.receive_text()
            print('is sc ocpp file: websocket.receive_text()', data)
            await socket_manager.broadcast_to_room(charge_point_id, data)
    except WebSocketDisconnect:
        await socket_manager.remove_user_from_room(charge_point_id, websocket)
        # await socket_manager.broadcast_to_room(charge_point_id, json.dumps({}))

Fast api route

@app.post("/start/{charge_point_id}")

async def remote_start_transaction(
        charge_point_id: str,
        remote_start_item: RemoteStartItem
):
    uid = str(uuid.uuid4())
    message = [2, uid, "RemoteStartTransaction",
               {"connectorId": remote_start_item.connector_id, "idTag": remote_start_item.order_id}]

    remote_start_message = json.dumps(message, separators=(',', ':'))

    await socket_manager.pubsub_client.connect()
    await socket_manager.broadcast_to_room(charge_point_id, remote_start_message)

    # cp = SChargePoint(charge_point_id, socket_manager)
    # await cp.remote_start_transaction(remote_start_item.order_id, remote_start_item.connector_id)
    # Return a response indicating the request was sent
    return {"message": "RemoteStartTransaction request sent",
            "result": remote_start_message}

@app.websocket("/ws/{charge_point_id}")
async def websocket_endpoint(websocket: WebSocket):
    await on_connect(websocket, websocket.path_params.get('charge_point_id'))

and pubsub code

import asyncio
import os
from dotenv import load_dotenv
import redis.asyncio as aioredis
from fastapi import WebSocket

load_dotenv()

class RedisPubSubManager:
    """
        Initializes the RedisPubSubManager.

    Args:
        host (str): Redis server host.
        port (int): Redis server port.
    """

    def __init__(self, host='localhost', port=6379):
        self.redis_connection = None
        self.redis_host = os.getenv('REDIS_HOST', host)
        self.redis_port = os.getenv('REDIS_PORT', port)
        self.redis_password = os.getenv('REDIS_PASSWORD', None)
        self.pubsub = None

    async def _get_redis_connection(self) -> aioredis.Redis:
        """
        Establishes a connection to Redis.

        Returns:
            aioredis.Redis: Redis connection object.
        """
        redis_connection_pool = aioredis.ConnectionPool(
            host=self.redis_host,
            port=self.redis_port,
            password=self.redis_password,
            max_connections=100
        )

        return aioredis.Redis(connection_pool=redis_connection_pool)
        # return aioredis.Redis(host=self.redis_host,
        #                       port=self.redis_port,
        #                       password=self.redis_password,
        #                       auto_close_connection_pool=False)

    async def connect(self) -> None:
        """
        Connects to the Redis server and initializes the pubsub client.
        """
        self.redis_connection = await self._get_redis_connection()
        self.pubsub = self.redis_connection.pubsub()

    async def _publish(self, room_id: str, message: str) -> None:
        """
        Publishes a message to a specific Redis channel.

        Args:
            room_id (str): Channel or room ID.
            message (str): Message to be published.
        """
        await self.redis_connection.publish(room_id, message)

    async def subscribe(self, room_id: str) -> aioredis.Redis:
        """
        Subscribes to a Redis channel.

        Args:
            room_id (str): Channel or room ID to subscribe to.

        Returns:
            aioredis.ChannelSubscribe: PubSub object for the subscribed channel.
        """
        await self.pubsub.subscribe(room_id)
        return self.pubsub

    async def unsubscribe(self, room_id: str) -> None:
        """
        Unsubscribes from a Redis channel.

        Args:
            room_id (str): Channel or room ID to unsubscribe from.
        """
        await self.pubsub.unsubscribe(room_id)

class WebSocketManager:

    def __init__(self):
        """
        Initializes the WebSocketManager.

        Attributes:
            rooms (dict): A dictionary to store WebSocket connections in different rooms.
            pubsub_client (RedisPubSubManager): An instance of the RedisPubSubManager class for pub-sub functionality.
        """
        self.rooms: dict = {}
        self.pubsub_client = RedisPubSubManager()

    async def add_user_to_room(self, charger_id: str, websocket: WebSocket, cp) -> None:
        """
        Adds a user's WebSocket connection to a room.

        Args:
            cp: SChargePoint class
            charger_id (str): Room ID or channel name.
            websocket (WebSocket): WebSocket connection object.
        """
        await websocket.accept()

        if charger_id in self.rooms:
            self.rooms[charger_id].append(websocket)
        else:
            self.rooms[charger_id] = [websocket]

            await self.pubsub_client.connect()
            print("room", self.rooms)
            pubsub_subscriber = await self.pubsub_client.subscribe(charger_id)
            print("ddd1", pubsub_subscriber)
            asyncio.create_task(self._pubsub_data_reader(pubsub_subscriber, cp))

    async def broadcast_to_room(self, room_id: str, message: str) -> None:
        """
        Broadcasts a message to all connected WebSockets in a room.

        Args:
            room_id (str): Room ID or channel name.
            message (str): Message to be broadcasted.
        """
        await self.pubsub_client._publish(room_id, message)

    async def remove_user_from_room(self, room_id: str, websocket: WebSocket) -> None:
        """
        Removes a user's WebSocket connection from a room.

        Args:
            room_id (str): Room ID or channel name.
            websocket (WebSocket): WebSocket connection object.
        """
        self.rooms[room_id].remove(websocket)

        if len(self.rooms[room_id]) == 0:
            del self.rooms[room_id]
            await self.pubsub_client.unsubscribe(room_id)

    async def _pubsub_data_reader(self, pubsub_subscriber, cp):
        """
        Reads and broadcasts messages received from Redis PubSub.

        Args:
            pubsub_subscriber (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
        """
        try:
            print('it come here')
            while True:
                message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print("only if message available", message)
                    charger_id = message['channel'].decode('utf-8')
                    print("charger_id", charger_id)
                    cp.set_sockets(self.rooms[charger_id])
                    data = message['data'].decode('utf-8')
                    print("message form", data)
                    await cp.route_message(data)
        except asyncio.CancelledError as e:
            reason = str(e)
            print(f"An error occurred in _pubsub_data_reader: {reason}")
        except Exception as e:
            # Handle other exceptions that may occur
            print(f"An error occurred in _pubsub_data_reader: {e}")
        finally:
            # Perform cleanup if necessary
            pass
OrangeTux commented 1 year ago

Can you provide a strack trace? Without more details, I can't help you.

arpantechparticle commented 1 year ago

thank you for reply @OrangeTux. i try this for last 4 days but no luck. here is my strack trace

ERROR:ocpp:Error while handling request '<Call - unique_id=ba63e555-a3c9-400f-8d0f-ce07f89dd680, action=RemoteStartTransaction, payload={'connectorId': 1, 'idTag': '123'}>'
Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
    await getter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/arpanpatel/projects/spotcharge-occp-api/venv/lib/python3.9/site-packages/ocpp/charge_point.py", line 288, in call
    response = await self._get_specific_response(
  File "/Users/arpanpatel/projects/spotcharge-occp-api/venv/lib/python3.9/site-packages/ocpp/charge_point.py", line 322, in _get_specific_response
    response = await asyncio.wait_for(self._response_queue.get(), timeout)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/arpanpatel/projects/spotcharge-occp-api/venv/lib/python3.9/site-packages/ocpp/charge_point.py", line 205, in _handle_call
    response = await response
  File "/Users/arpanpatel/projects/spotcharge-occp-api/sc_ocpp16.py", line 64, in remote_start_transaction
    response = await self.call(request)
  File "/Users/arpanpatel/projects/spotcharge-occp-api/venv/lib/python3.9/site-packages/ocpp/charge_point.py", line 292, in call
    raise asyncio.TimeoutError(
asyncio.exceptions.TimeoutError: Waited 30s for response on [2,"f4260f9f-cc73-45ea-a176-b57ad1c060c6","RemoteStartTransaction",{"idTag":"123","connectorId":1}].
self.sockets [<starlette.websockets.WebSocket object at 0x108afb6d0>]
OrangeTux commented 1 year ago

The stack trace you've added doesn't mention asdict(). Can post such stack trace?

arpantechparticle commented 1 year ago

while handle this error using CancelledError exception. I got asdict error in this function. is my implementation right for remoteStartTranscation

    async def _pubsub_data_reader(self, pubsub_subscriber, cp):
        """
        Reads and broadcasts messages received from Redis PubSub.

        Args:
            pubsub_subscriber (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
        """
        try:
            print('it come here')
            while True:
                message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print("only if message available", message)
                    charger_id = message['channel'].decode('utf-8')
                    print("charger_id", charger_id)
                    cp.set_sockets(self.rooms[charger_id])
                    data = message['data'].decode('utf-8')
                    print("message form", data)
                    await cp.route_message(data)
        except asyncio.CancelledError as e:
            reason = str(e)
            print(f"An error occurred in _pubsub_data_reader: {reason}")
        except Exception as e:
            # Handle other exceptions that may occur
            print(f"An error occurred in _pubsub_data_reader: {e}")
        finally:
            # Perform cleanup if necessary
            pass
arpantechparticle commented 1 year ago

when i call /start/{charge_point_id} end point. it create message like this

[2,"fae42889-978d-471c-865a-5611a0126506","RemoteStartTransaction",{"idTag":"123","connectorId":1}]

then send broadcast this message. now this message comes in

 async def _pubsub_data_reader(self, pubsub_subscriber, cp):
        """
        Reads and broadcasts messages received from Redis PubSub.

        Args:
            pubsub_subscriber (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
        """
        try:
            print('it come here')
            while True:
                message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print("only if message available", message)
                    charger_id = message['channel'].decode('utf-8')
                    print("charger_id", charger_id)
                    cp.set_sockets(self.rooms[charger_id])
                    data = message['data'].decode('utf-8')
                    print("message form", data)
                    await cp.route_message(data)
        except asyncio.CancelledError as e:
            reason = str(e)
            print(f"An error occurred in _pubsub_data_reader: {reason}")
        except Exception as e:
            # Handle other exceptions that may occur
            print(f"An error occurred in _pubsub_data_reader: {e}")
        finally:
            # Perform cleanup if necessary
            pass

this message now goes to

    @on(Action.RemoteStartTransaction)
    async def remote_start_transaction(self, id_tag, connector_id, **kwargs):
        try:
            print('kwargs', kwargs)
            request = call.RemoteStartTransactionPayload(id_tag=id_tag, connector_id=connector_id)
            print('is data class', is_dataclass(request))
            response = await self.call(request)
            print('response.status', response.status)
            if response.status == RemoteStartStopStatus.accepted:
                print("Transaction Started!!!")

            return response
        except asyncio.CancelledError as e:
            reason = str(e)
            print(f"An error occurred in remote_start_transaction: {reason}")
        except Exception as e:
            # Handle other exceptions that may occur
            print(f"An error occurred in remote_start_transaction: {e}")
        finally:
            # Perform cleanup if necessary
            pass

here i got error.

OrangeTux commented 1 year ago

I'm sorry to be annoying, but you still didn't posted a stack trace with that includes the exception from the opening post. Please provide it.

arpantechparticle commented 1 year ago

It's alright, and I appreciate your patience. Here's my understanding of what you want from me.

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-8' coro=<WebSocketManager._pubsub_data_reader() done, defined at /Users/arpanpatel/projects/spotcharge-occp-api/pubsub.py:154> exception=TypeError('asdict() should be called on dataclass instances')>
Traceback (most recent call last):
  File "/Users/arpanpatel/projects/spotcharge-occp-api/pubsub.py", line 181, in _pubsub_data_reader
    await cp.route_message(data)
  File "/Users/arpanpatel/projects/spotcharge-occp-api/venv/lib/python3.9/site-packages/ocpp/charge_point.py", line 154, in route_message
    await self._handle_call(msg)
  File "/Users/arpanpatel/projects/spotcharge-occp-api/venv/lib/python3.9/site-packages/ocpp/charge_point.py", line 214, in _handle_call
    temp_response_payload = asdict(response)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/dataclasses.py", line 1074, in asdict
    raise TypeError("asdict() should be called on dataclass instances")
TypeError: asdict() should be called on dataclass instances
Jared-Newell-Mobility commented 11 months ago

On line - temp_response_payload = asdict(response) - response is not a dataclass; asdict requires a dataclass - see https://docs.python.org/3/library/dataclasses.html

an example of a dataclass (note the decorator) could be a ChargingScheduleType datatype from OCPP 2.0.1:

@dataclass
class ChargingScheduleType:
    """
    Charging schedule structure defines a list of charging periods, as used in:
    GetCompositeSchedule.conf and ChargingProfile.
    ChargingScheduleType is used by: ChargingProfileType,
    NotifyChargingLimitRequest, NotifyEVChargingScheduleRequest
    """

    id: int
    charging_rate_unit: enums.ChargingRateUnitType
    charging_schedule_period: ChargingSchedulePeriodType
    start_schedule: Optional[str] = None
    duration: Optional[int] = None
    min_charging_rate: Optional[float] = None
    sales_tariff: Optional[SalesTariffType] = None

You could check response's type with type() to confirm,

However, as this is not directly related to this library, I'll close the issue for now