mobilityhouse / ocpp

Python implementation of the Open Charge Point Protocol (OCPP).
MIT License
794 stars 317 forks source link

how to broadcast messages to all charging stations #86

Closed madhavsund closed 4 years ago

madhavsund commented 4 years ago

How to broad cast some of messages (SetVariables) to all connected charging stations. Like updating the new unit price.

OrangeTux commented 4 years ago

Could you show me what you've tried?

madhavsund commented 4 years ago

at server

class ChargePoint(cp):
    async def set_variables_request(self):
    request = call.SetVariablesPayload(
                set_variable_data = [ {"attribute_value":"100" ,"component":{"name":"price"},"variable":{"name":"price"}}]
                )
    response =  self.call(request)
    print(response)        

async def on_connect(websocket, path):
    charge_point_id = path.strip('/')
    cp = ChargePoint(charge_point_id, websocket)
     try:
        await asyncio.gather(cp.start(), cp.set_variables_request())
     except websockets.exceptions.ConnectionClosed:
        print("client has been closed.")    

This will set the variables only during connecting with server. but i need to invoke in the middle of the communication

OrangeTux commented 4 years ago

You need to bring the ChargePoint created in on_connect() out of the scope of on_connect(). Then you can use a different task to interact with the the ChargePoint.

Here is an example of a CSMS that can can be controlled using an HTTP API. The HTTP API has 2 endpoints:

The HTTP server is running at port 8080. Here a few CURL examples:

$ curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"key":"MeterValueSampleInterval","value":"10"}' \
  http://localhost:8080/
$  curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"id":"CHARGE_POINT_1"}' \
  http://localhost:8080/disconnect

Here code of the CSMS with the HTTP API:

import asyncio
import websockets
from aiohttp import web
from functools import partial
from datetime import datetime

from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result, call

class ChargePoint(cp):
    @on(Action.BootNotification)
    def on_boot_notitication(self, charge_point_vendor, charge_point_model, **kwargs):
        return call_result.BootNotificationPayload(
            current_time=datetime.utcnow().isoformat(),
            interval=10,
            status=RegistrationStatus.accepted,
        )

    async def change_configuration(self, key: str, value: str):
        return await self.call(call.ChangeConfigurationPayload(key=key, value=value))

class CentralSystem:
    def __init__(self):
        self._chargers = {}

    def register_charger(self, cp: ChargePoint) -> asyncio.Queue:
        """ Register a new ChargePoint at the CSMS. The function returns a
        queue.  The CSMS will put a message on the queue if the CSMS wants to
        close the connection. 
        """
        queue = asyncio.Queue(maxsize=1)

        # Store a reference to the task so we can cancel it later if needed.
        task = asyncio.create_task(self.start_charger(cp, queue))
        self._chargers[cp] = task

        return queue

    async def start_charger(self, cp, queue):
        """ Start listening for message of charger. """
        try:
            await cp.start()
        except Exception as e:
            print(f"Charger {cp.id} disconnected: {e}")
        finally:
            # Make sure to remove referenc to charger after it disconnected.
            del self._chargers[cp]

            # This will unblock the `on_connect()` handler and the connection
            # will be destroyed.
            await queue.put(True)

    async def change_configuration(self, key: str, value: str):
        for cp in self._chargers:
            await cp.change_configuration(key, value)

    def disconnect_charger(self, id: str):
        for cp, task in self._chargers.items():
            if cp.id == id:
                task.cancel()
                return 

        raise ValueError(f"Charger {id} not connected.")

async def change_config(request):
    """ HTTP handler for changing configuration of all charge points. """
    data = await request.json()
    csms = request.app["csms"]

    await csms.change_configuration(data["key"], data["value"])

    return web.Response()

async def disconnect_charger(request):
    """ HTTP handler for disconnecting a charger. """
    data = await request.json()
    csms = request.app["csms"]

    try:
        csms.disconnect_charger(data["id"])
    except ValueError as e:
        print(f"Failed to disconnect charger: {e}")
        return web.Response(status=404)

    return web.Response()

async def on_connect(websocket, path, csms):
    """ For every new charge point that connects, create a ChargePoint instance
    and start listening for messages.

    The ChargePoint is registered at the CSMS.

    """
    charge_point_id = path.strip("/")
    cp = ChargePoint(charge_point_id, websocket)

    print(f"Charger {cp.id} connected.")

    # If this handler returns the connection will be destroyed. Therefore we need some
    # synchronization mechanism that blocks until CSMS wants to close the connection.
    # An `asyncio.Queue` is used for that.
    queue = csms.register_charger(cp)
    await queue.get()

async def create_websocket_server(csms: CentralSystem):
    handler = partial(on_connect, csms=csms)
    return await websockets.serve(handler, "0.0.0.0", 9000, subprotocols=["ocpp1.6"])

async def create_http_server(csms: CentralSystem):
    app = web.Application()
    app.add_routes([web.post("/", change_config)])
    app.add_routes([web.post("/disconnect", disconnect_charger)])

    # Put CSMS in app so it can be accessed from request handlers.
    # https://docs.aiohttp.org/en/stable/faq.html#where-do-i-put-my-database-connection-so-handlers-can-access-it
    app["csms"] = csms

    # https://docs.aiohttp.org/en/stable/web_advanced.html#application-runners
    runner = web.AppRunner(app)
    await runner.setup()

    site = web.TCPSite(runner, "localhost", 8080)
    return site

async def main():
    csms = CentralSystem()

    websocket_server = await create_websocket_server(csms)
    http_server = await create_http_server(csms)

    await asyncio.wait([websocket_server.wait_closed(), http_server.start()])

if __name__ == "__main__":
    asyncio.run(main())
madhavsund commented 4 years ago

Thanks a lot :)

fuzzdogs commented 4 years ago

Thank you very much, OrangeTux, for the sample code of the CSMS. It is so helpful!

The server runs perfectly and I can connect (and disconnect) with a chargepoint-simulation script and with a real chargepoint.

Although I am only a novice in Python, I tried to extend it with a RemoteStartTransaction, that can be triggered externally through the API. The function remotestart_charger is then triggered (the log prints "charger X is going to prepare charging") but there is no call result (the log does NOT print "charger X has really started charging"). Apparently I am doing something wrong. Could you please point me in the right direction? (and yes, I checked the other threads about RemoteStartTransaction but they were not helpful in this case)

import logging
import asyncio
import websockets
import json
import sys
from aiohttp import web
from functools import partial
from datetime import datetime
import mysql.connector
from mysql.connector import Error
from mysql.connector import errorcode
from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result, call

logging.basicConfig( format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S',
    filename='server.log')

connection = mysql.connector.connect(host='localhost',
                                            database='[db_name]',
                                            user='[db_user]',
                                            password='[db_pwd]')

VERSION = "0.90"
PORT1 = 9000    # EV CHARGER
PORT2 = 8000    # WEBSERVER
SERVERADDRESS = '[ip-address]'
APIKEY = '[key]'

print("====================================================================")
print("EV CHARGING SERVER v"+VERSION)
print("====================================================================")

def closeMySQL():
    if (connection.is_connected()):
                connection.close()
                print("MySQL connection closed")

class ChargePoint(cp):
    @on(Action.BootNotification)
    def on_boot_notitication(self, charge_point_vendor, charge_point_model, **kwargs):
        return call_result.BootNotificationPayload(
            current_time=datetime.utcnow().isoformat(),
            interval=10,
            status=RegistrationStatus.accepted,
        )

    async def change_configuration(self, key: str, value: str):
        return await self.call(call.ChangeConfigurationPayload(key=key, value=value))

class CentralSystem:
    def __init__(self):
        self._chargers = {}

    def register_charger(self, cp: ChargePoint) -> asyncio.Queue:
        """ Register a new ChargePoint at the CSMS. The function returns a
        queue.  The CSMS will put a message on the queue if the CSMS wants to
        close the connection. 
        """
        queue = asyncio.Queue(maxsize=1)

        # Store a reference to the task so we can cancel it later if needed.
        task = asyncio.create_task(self.start_charger(cp, queue))
        self._chargers[cp] = task

        return queue

    async def start_charger(self, cp, queue):
        """ Start listening for message of charger. """
        try:
            await cp.start()
        except Exception as e:
            print(f"Charger {cp.id} disconnected. {e}")
        finally:
            # Make sure to remove reference to charger after it disconnected.
            del self._chargers[cp]

            # This will unblock the `on_connect()` handler and the connection
            # will be destroyed.
            await queue.put(True)

    async def change_configuration(self, key: str, value: str):
        for cp in self._chargers:
            await cp.change_configuration(key, value)

    def disconnect_charger(self, id: str):
        for cp, task in self._chargers.items():
            if cp.id == id:
                task.cancel()
                return 

        raise ValueError(f"Charger {id} was not connected so I can not disconnect it.")

    def remotestart_charger(self, id: str):
        for cp, task in self._chargers.items():
            if cp.id == id:
                print(f"Charger {id} is going to prepare charging!")
                return call_result.StartTransactionPayload(
                        id_tag_info={
                        "status": 'Accepted'
                    },
                    transaction_id=int(1)
                ) 
                print(f"Charger {id} has REALLY started charging!")

        raise ValueError(f"Charger {id} not connected!")

async def change_config(request):
    """ HTTP handler for changing configuration of all charge points. """
    data = await request.json()
    csms = request.app["csms"]

    await csms.change_configuration(data["key"], data["value"])

    return web.Response()

async def disconnect_charger(request):
    """ HTTP handler for disconnecting a charger. """
    data = await request.json()
    csms = request.app["csms"]

    try:
        csms.disconnect_charger(data["id"])
    except ValueError as e:
        print(f"Failed to disconnect charger: {e}")
        return web.Response(status=404)

    return web.Response()

async def remote_start(request):
    """ HTTP handler for remote starting a charger. """
    data = await request.json()
    csms = request.app["csms"]

    if data["apikey"]!=APIKEY:
        print("API key is not valid")
        return web.Response(status=404)

    try:
        csms.remotestart_charger(data["id"])
    except ValueError as e:
        print(f"Failed to remotely start charger: {e}")
        return web.Response(status=404)

    return web.Response()

async def on_connect(websocket, path, csms):
    """ For every new charge point that connects, create a ChargePoint instance
    and start listening for messages.

    The ChargePoint is registered at the CSMS.

    """
    charge_point_id = path.strip("/")
    cp = ChargePoint(charge_point_id, websocket)

    print(f"Charger {cp.id} connected.")

    try:
        mySql_insert_query = """INSERT IGNORE INTO charge_box (name) 
                           VALUES ('"""+charge_point_id+"""') """
        cursor = connection.cursor()
        cursor.execute(mySql_insert_query)
        connection.commit()
        mySql_update_query = """UPDATE charge_box SET datetime=NOW() 
                            WHERE name='"""+charge_point_id+"""' """
        cursor.execute(mySql_update_query)
        connection.commit()
        cursor.close()

    except mysql.connector.Error as error:
        print("Something went wrong: {}".format(error))
        print("-----------------------------------------------------")

    # If this handler returns the connection will be destroyed. Therefore we need some
    # synchronization mechanism that blocks until CSMS wants to close the connection.
    # An `asyncio.Queue` is used for that.
    queue = csms.register_charger(cp)
    await queue.get()

async def create_websocket_server(csms: CentralSystem):
    handler = partial(on_connect, csms=csms)
    return await websockets.serve(handler, SERVERADDRESS, PORT1, subprotocols=["ocpp1.6"])

async def create_http_server(csms: CentralSystem):
    app = web.Application()
    app.add_routes([web.post("/", change_config)])
    app.add_routes([web.post("/disconnect", disconnect_charger)])
    app.add_routes([web.post("/remotestart", remote_start)])

    app["csms"] = csms

    runner = web.AppRunner(app)
    await runner.setup()

    site = web.TCPSite(runner, SERVERADDRESS, PORT2)
    return site

async def main():
    csms = CentralSystem()

    websocket_server = await create_websocket_server(csms)
    http_server = await create_http_server(csms)

    await asyncio.wait([websocket_server.wait_closed(), http_server.start()])

if __name__ == "__main__":
    asyncio.run(main())
mathief123 commented 3 years ago

Hello, i know its been some time since you solved it, but how did you manage to solve it?

DragotaDarius commented 3 years ago

Hello, I have tried what fuzzdogs has done but I cannot manage to get it working. I am pretty new to ocpp and python. I have tried the curl examples but all I get is a 404 error. This is the command I have tried : curl -X POST -H "Content-Type: application/json" -d '{"id":"test"}' And this is what I get:

http://localhost:8080/disconnect
<!DOCTYPE html>
<html><head><title>Not Found</title></head>
<body>
<h2>Access Error: 404 -- Not Found</h2>
<pre>Cannot open document for: /disconnect</pre>
</body>
</html>
LW-Ho commented 2 years ago

You need to bring the ChargePoint created in on_connect() out of the scope of on_connect(). Then you can use a different task to interact with the the ChargePoint.

Here is an example of a CSMS that can can be controlled using an HTTP API. The HTTP API has 2 endpoints:

  • POST / - to change configuration for all connected chargers. It excepts a JSON body with the fields key and value.
  • POST /disconnect - to disconnect a charger. It expects a JSON body with the field id that contains the charger ID.

The HTTP server is running at port 8080. Here a few CURL examples:

$ curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"key":"MeterValueSampleInterval","value":"10"}' \
  http://localhost:8080/
$  curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"id":"CHARGE_POINT_1"}' \
  http://localhost:8080/disconnect

Here code of the CSMS with the HTTP API:

import asyncio
import websockets
from aiohttp import web
from functools import partial
from datetime import datetime

from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result, call

class ChargePoint(cp):
    @on(Action.BootNotification)
    def on_boot_notitication(self, charge_point_vendor, charge_point_model, **kwargs):
        return call_result.BootNotificationPayload(
            current_time=datetime.utcnow().isoformat(),
            interval=10,
            status=RegistrationStatus.accepted,
        )

    async def change_configuration(self, key: str, value: str):
        return await self.call(call.ChangeConfigurationPayload(key=key, value=value))

class CentralSystem:
    def __init__(self):
        self._chargers = {}

    def register_charger(self, cp: ChargePoint) -> asyncio.Queue:
        """ Register a new ChargePoint at the CSMS. The function returns a
        queue.  The CSMS will put a message on the queue if the CSMS wants to
        close the connection. 
        """
        queue = asyncio.Queue(maxsize=1)

        # Store a reference to the task so we can cancel it later if needed.
        task = asyncio.create_task(self.start_charger(cp, queue))
        self._chargers[cp] = task

        return queue

    async def start_charger(self, cp, queue):
        """ Start listening for message of charger. """
        try:
            await cp.start()
        except Exception as e:
            print(f"Charger {cp.id} disconnected: {e}")
        finally:
            # Make sure to remove referenc to charger after it disconnected.
            del self._chargers[cp]

            # This will unblock the `on_connect()` handler and the connection
            # will be destroyed.
            await queue.put(True)

    async def change_configuration(self, key: str, value: str):
        for cp in self._chargers:
            await cp.change_configuration(key, value)

    def disconnect_charger(self, id: str):
        for cp, task in self._chargers.items():
            if cp.id == id:
                task.cancel()
                return 

        raise ValueError(f"Charger {id} not connected.")

async def change_config(request):
    """ HTTP handler for changing configuration of all charge points. """
    data = await request.json()
    csms = request.app["csms"]

    await csms.change_configuration(data["key"], data["value"])

    return web.Response()

async def disconnect_charger(request):
    """ HTTP handler for disconnecting a charger. """
    data = await request.json()
    csms = request.app["csms"]

    try:
        csms.disconnect_charger(data["id"])
    except ValueError as e:
        print(f"Failed to disconnect charger: {e}")
        return web.Response(status=404)

    return web.Response()

async def on_connect(websocket, path, csms):
    """ For every new charge point that connects, create a ChargePoint instance
    and start listening for messages.

    The ChargePoint is registered at the CSMS.

    """
    charge_point_id = path.strip("/")
    cp = ChargePoint(charge_point_id, websocket)

    print(f"Charger {cp.id} connected.")

    # If this handler returns the connection will be destroyed. Therefore we need some
    # synchronization mechanism that blocks until CSMS wants to close the connection.
    # An `asyncio.Queue` is used for that.
    queue = csms.register_charger(cp)
    await queue.get()

async def create_websocket_server(csms: CentralSystem):
    handler = partial(on_connect, csms=csms)
    return await websockets.serve(handler, "0.0.0.0", 9000, subprotocols=["ocpp1.6"])

async def create_http_server(csms: CentralSystem):
    app = web.Application()
    app.add_routes([web.post("/", change_config)])
    app.add_routes([web.post("/disconnect", disconnect_charger)])

    # Put CSMS in app so it can be accessed from request handlers.
    # https://docs.aiohttp.org/en/stable/faq.html#where-do-i-put-my-database-connection-so-handlers-can-access-it
    app["csms"] = csms

    # https://docs.aiohttp.org/en/stable/web_advanced.html#application-runners
    runner = web.AppRunner(app)
    await runner.setup()

    site = web.TCPSite(runner, "localhost", 8080)
    return site

async def main():
    csms = CentralSystem()

    websocket_server = await create_websocket_server(csms)
    http_server = await create_http_server(csms)

    await asyncio.wait([websocket_server.wait_closed(), http_server.start()])

if __name__ == "__main__":
    asyncio.run(main())

Thanks for your sharing. But version:10.1 websockets handler cannot use partial.

https://websockets.readthedocs.io/en/stable/project/changelog.html#new-features

tomha85 commented 6 months ago

Hi @OrangeTux and everyone, Code is good, i did testing successfully to real charger. Problem is for production deployment with high loading , and need load balancing, I tried in AKS with HPA, horizontal scaling PODS. Logic seems to be failed with 2-3 PODs, a charger will automatically register into any PODS, and HTTPs call can not check where charger_id is to send command to client. If run on a POD, it looks good. I am thinking about AZURE IOT HUB, but how to get wss and aplly OCPP package into IOT hub Pls give any suggestion to deploy into production?