Heavy02011 / diyrobocars-influxdb-part

documentation of integration of part influx.py into DIYrobocars.com framework
3 stars 0 forks source link

There is no self.poll(), part is not really threaded #2

Open Ezward opened 2 years ago

Ezward commented 2 years ago

https://github.com/Heavy02011/diyrobocars-influxdb-part/blob/886ef827b9d0fc9b8fa6803ee83a3397b80e6c0b/parts/influx.py#L52

reference to method that does not exist.

Ezward commented 2 years ago

This is a very cool part. I've looked at it and have a few comments:

  1. As the title of this issue suggests, the self.run() method is calling self.poll() which does not exist.
  2. self.run_threaded() is implemented, but it is doing all the work. In a threaded part, self.run_threaded() is called from the main thread and so it should do as little work as possible. Most of the work, like reading from a device or writing data to a disk or database, should be done in self.update(). self.update() is the method that runs in the background thread. self.update() is basically an infinite loop (well, not infinite; it should loop while self.running is true) that does the heavy lifting in the background. So for this part, self.update() should do all the writing to influxdb. That means that run_threaded() should save the data to a buffer so that update() can get it when it needs it.
  3. Since run_threaded() and update() are running in different threads, any data that they share must be protected against simultaneous access. You don't want update() writing the json data at the same time that run_threaded() is changing it. This is done using an instance of threading.Lock()
  4. You don't need the __main__ if it is not doing anything. I like to implement a little test program that can take arguments and test if the part is actually working; it's often easier to test all the functionality with such a test program rather than have to change configurations on the donkeycar, but that is optional. Here is example in depth camera part: https://github.com/autorope/donkeycar/blob/c5ef9d6ced910eed582bb5d5871b88990ccc60f4/donkeycar/parts/realsense435i.py#L222

So I've taken a run at refactoring to support the part being run in a thread. I've also implemented the self.run() method so it can be run non-threaded if desired. I have not tested any of these changes, so please look at this code as a work in progress.

#!/usr/bin/env python3
"""
Scripts for storing data into Influxdb with the Donkeycar

author: Rainer Bareiß, 2020
many thanks to the Parking Lot Nerds team & 
especially Paul for valuable hints how to code this

specify your user credentials for InfluxDB in ./bashrc
export INFLUXDB_MYUSER='<user>'
export INFLUXDB_MYPASSWORD='<password>'

"""

import os
import threading
import time
import donkeycar as dk

from influxdb import InfluxDBClient
import json

# Server & ports
INFLUX_HOST   = "127.0.0.1"
INFLUX_PORT   = 8086
INFLUXDB_NAME = "plnracing1"

class InfluxController(object):
    '''
    store actual simulator data into influxdb
    '''
    def __init__(self, path, inputs=None, types=None, user_meta=[]):
        # standard variables
        print("setting up part: influx...")

        self.inputs    = inputs
        self.types     = types
        self.user_meta = user_meta
        self.istep     = 0
        self.isaved    = 0

        # setup influxdb
        self.influx_user     = os.environ['INFLUXDB_MYUSER']
        self.influx_password = os.environ['INFLUXDB_MYPASSWORD']
        databasename         = INFLUXDB_NAME
        self.dbclient        = InfluxDBClient(INFLUX_HOST, INFLUX_PORT, self.influx_user, self.influx_password, databasename)
        self.dbclient.drop_database(databasename)
        self.dbclient.create_database(databasename)

        self.lock = threading.Lock()
        self.json_body = []  # buffer for 
        self.running = True

    def add_record(self, one_json_body):
        if one_json_body:
            #
            # make sure we access self.json_body safely.
            # This will block until it is safe to append'
            #
            with self.lock:
                self.json_body.append(one_json_body)

    def write_records(self):
        #
        # make sure we access self.json_body in
        # a threadsafe manner.
        # This will not block:  
        # - If it can't write then it will return 0.  
        # - If it can write then it will return the 
        #   number of records that were written
        # After a successful write the self.json_body
        # buffer is cleared so these records are
        # not written again.
        #
        count = 0
        if self.lock.acquire(blocking=False):
            try:
                count = len(self.json_body)
                if count > 0:
                    self.dbclient.write_points(self.json_body)
                    self.json_body = []  # empty the buffer
            finally:
                self.lock.release()
        return count

    def update(self):
        while self.running:
            self.isaved += self.write_records()
            time.sleep(0)  # give other threads time to run

    def run(self, *args):
        if self.running:
            self.run_threaded(*args)
            self.write()

    def run_threaded(self, *args):
        if self.running:
            # prepare record
            assert len(self.inputs) == len(args)
            record = dict(zip(self.inputs, args))

            # get rid of non-numeric fields
            del record["cam/image_array"]
            del record["user/mode"]

            # influxdb complains about zero integers when field is already a float
            for i in record:
                record[i] = float(record[i])

            # count the datasets saved
            record["istep"] = self.istep

            # prepare payload for influxdb
            this_json_body = {
                            'measurement': 'DonkeySimulator', 
                            'tags': {
                                'car':'PLN_8',
                                'race':'training'
                            },
                            'time':'2020-05-10T13:11:00Z', # dummy timestamp
                            'fields':'json_packet_placeholder'
                        }

            # InfluxDB needs nanoseconds since the epoch as timestamp 
            this_json_body["time"]   = time.time_ns()

            # populate the fields we got from DonkeySim
            this_json_body["fields"] = record

            #
            # append this payload so it can be written
            # in the update thread
            #
            self.add_record(this_json_body)
            #print(self.json_body)

            self.istep += 1

    def shutdown(self):
        self.running = False
Heavy02011 commented 2 years ago

Thx again Ed, exactly what I hoped to learn by doing this. I will setup a sketch including your comments to visualize the anatomy & function of a part. In parallel I will look into implementing the mentioned test routine.

Ezward commented 2 years ago

Here is an example of a self test main that can run threaded or unthreaded https://github.com/autorope/donkeycar/blob/921-next-generation-odometer-parts/donkeycar/parts/tachometer.py

Ezward commented 2 years ago

f'ing python; here is the bug self.lock.release should be self.lock.release(). Because of that the lock is never released, so it blocks in the with self.lock statement. I'll update the code above.

Ezward commented 2 years ago

One more thing; we should a have a time.sleep(0) in the update loop() to give other threads time to run. It's not absolutely necessary, but it is good to have. I'll update the code above.