mobilityhouse / ocpp

Python implementation of the Open Charge Point Protocol (OCPP).
MIT License
792 stars 316 forks source link

Could setting charging profiles in after transaction post-request hook lead into OCPP protocol level issue? #533

Open apekkar opened 12 months ago

apekkar commented 12 months ago

In OCPP 1.6 we have a sequence specified so that SetChargingProfile is sent by Central System after StartTransaction is sent (with transaction id provided)

We have been facing following issues lately. I believe this is happening occasionally:

(pseudo logs from charge point)

2023-11-11T10:40:06.951 Sending start transaction request...
2023-11-11T10:40:06.977 send [2,"8a446518-4fc2-4e8b-b54a-83504d1fce72","StartTransaction",{"connectorId":1,"idTag":"0","meterStart":33143,"timestamp":"2023-11-11T10:40:06.924Z"}]
2023-11-11T10:40:07.195 receive [2,"09a6a0ea-7fdc-42ad-b8d3-7804251423fc","SetChargingProfile",{"connectorId":1,"csChargingProfiles":{"chargingProfileId":3089,"transactionId":1571723379,"stackLevel":1,"chargingProfilePurpose":"TxProfile","chargingProfileKind":"Absolute","validFrom":"2023-11-11T10:40:06.924000+00:00","validTo":"2023-11-18T10:40:06.924000+00:00","chargingSchedule":{"duration":604800,"startSchedule":"2023-11-11T10:40:06.924000+00:00","chargingRateUnit":"A","chargingSchedulePeriod":[{"startPeriod":0,"limit":23.0,"numberPhases":3},{"startPeriod":294,"limit":0.0,"numberPhases":3}]}}}]
2023-11-11T10:40:07.234 set_charging_profile: New profile received
2023-11-11T10:40:07.247 ERROR No ongoing transaction, cannot install TxProfile
2023-11-11T10:40:10.285 send [3,"09a6a0ea-7fdc-42ad-b8d3-7804251423fc",{"status":"Rejected"}]
2023-11-11T10:40:10.293 receive [3,"8a446518-4fc2-4e8b-b54a-83504d1fce72",{"transactionId":1571723379,"idTagInfo":{"status":"Accepted","expiryDate":"2023-11-11T10:45:00Z"}}]

We have are doing SetChargingProfile in after transaction post-request hook (pseudocode):

    @on(Action.StartTransaction)
    def on_start_transaction(self, **kwargs):
        return call_result.StartTransactionPayload(
            transaction_id=transaction.id, id_tag_info=id_tag_info
        )

    @after(Action.StartTransaction)
    async def after_start_transaction(self, **kwargs):
        response: call_result.SetChargingProfilePayload = await self.call(ocpp_payload)
        return response

Is this wrong way? Could this end up in situation where after action - as being handled so that they are non blocking whereas on actions are queued - is being sent before response call to start transaction request?

astrand commented 12 months ago

I think this is fine and correct. As you can see on https://github.com/mobilityhouse/ocpp/blob/master/ocpp/charge_point.py#L257, the response is sent before the after task is created.

We are currently investigating issues on the CS side though.

OrangeTux commented 12 months ago

I agree with @astrand here. The @after handler is executed after the response of StartTransaction has been sent. Can you artificially inject a await asyncio.sleep(5) in the @after() handler? Does the sequence of messages change?

astrand commented 12 months ago

I have looked at the client side now. It's a bit tricky, actually. route_message will use put_nowait(msg) to add a response to the response queue, but the task awaiting the call will not run at this point. Instead, the loop in start() will continue with running the handler for SetChargingProfile. Therefore, that handler will run before the code that handles the StartTransactionresponse.

(Note however that we are using aiohttp instead of websockets, and thus have a custom start() method, but it calls self.route_message(message) in a loop just like the standard one.)

Initially, I thought that this problem could be solved by adding await asyncio.sleep(0) to the start loop, but to my surprise this did not work. https://stackoverflow.com/questions/74493571/asyncio-sleep0-does-not-yield-control-to-the-event-loop explains why. Using a non-zero sleep works. But of course, this only guarantees that the awaited call() returns, if there are additional awaits additional synchronization is needed.

apekkar commented 12 months ago

I have looked at the client side now. It's a bit tricky, actually. route_message will use put_nowait(msg) to add a response to the response queue, but the task awaiting the call will not run at this point. Instead, the loop in start() will continue with running the handler for SetChargingProfile. Therefore, that handler will run before the code that handles the StartTransaction response.

Yes this is what I mean: from logical execution order one could tell StartTransaction is sent before the after hook but since the earlier is placed into self._response_queue and latter is executed right after and handled normally asynchronously then the question arises whether the 'after' really works in strict sense?

OrangeTux commented 11 months ago

Just to make sure I understand this right.

From the perspective of the CSMS, the SetChargingProfile response sent first, followed by a SetChargingProfile request. The charger receives the 2 message in the same order.

However, the charger processes the messages in reverse order. So it handles the SetChargingProfile request before handling the response to the StartTransaction. Is my understanding correct?

@apekkar Can you include a debug log of the charger that websocket logs? Those will proof that my understanding is correct.

astrand commented 11 months ago

Code-wise, I think this patch should be considered. Should I make a PR?

--- a/ocpp/charge_point.py
+++ b/ocpp/charge_point.py
@@ -161,6 +161,7 @@ class ChargePoint:
             LOGGER.info("%s: receive message %s", self.id, message)

             await self.route_message(message)
+            await asyncio.sleep(0.01)

     async def route_message(self, raw_msg):
         """
OrangeTux commented 11 months ago

I'm hesitant to introduce an artificial sleep. It makes a request/response sequence slower for everyone without guaranteeing the issue is solved in all cases.

astrand commented 11 months ago

That's a sane gut feeling. However, note that it is not the actual delay that solves this problem. You can use 0.0000001 instead if you want; this is just a somewhat ugly Python syntax to yield to the mainloop.

OrangeTux commented 11 months ago

I would expect that sleep(0) would have the same effect. But it doesn't. I'd like to know why that that is.

Can you reproduce the issue isolated in a unit test? That allows me to dig into the issue and play a bit around with code.

astrand commented 11 months ago

Why sleep(0) does not work is explained in the stackoverflow posting I linked to above. If you want to use sleep(0), you have to use three of them.

We can reproduce this in test case which is ~100 lines of code, with simulated CS and CSMS, but this test is built around aiohttp. But I can share the CS part of it:

class CS(ChargePoint):
    def __init__(self, id, connection, response_timeout=30): # pylint: disable=W0622
        super().__init__(id, connection, response_timeout)
        self.registration_status = None
        self.registration_status_when_ca = None
        self.start_task = asyncio.ensure_future(self.start())
        self.send_task = asyncio.ensure_future(self.sendloop())

    async def run_for(self, timeout):
        "Run OCPP for specified time"
        # Use asyncio.wait since it does not cancel tasks upon timeout
        done, dummy = await asyncio.wait((self.start_task, self.send_task), timeout=timeout,
                                         return_when=asyncio.FIRST_EXCEPTION)
        for dtask in done:
            dtask.result()

    async def sendloop(self):
        "Send things using OCPP"
        if self.registration_status != RegistrationStatus.accepted:
            request = call.BootNotificationPayload(
                charge_point_model="Test",
                charge_point_vendor="Test"
            )
            response = await self.call(request)
            self.registration_status = response.status
            logging.info("Connected to central system.")

        while True:
            await asyncio.sleep(0.2)

    @on(Action.ChangeAvailability)
    async def on_change_availability(self, connector_id, type): # pylint: disable=W0622
        "Handler for ChangeAvailability"
        self.registration_status_when_ca = self.registration_status
        return call_result.ChangeAvailabilityPayload(AvailabilityStatus.accepted)

Then connect and:

await cs1.run_for(2)
assert cs1.registration_status_when_ca