Pioreactor / pioreactor

Hardware and software for accessible, extensible, and scalable bioreactors. Built on Raspberry Pi.
https://pioreactor.com
MIT License
101 stars 9 forks source link

[wip] Integrating pH / DO probes (software only) #483

Open CamDavidsonPilon opened 8 months ago

CamDavidsonPilon commented 8 months ago

Assume there is available DO or pH measurements over i2c. Users would like to do the following:

  1. Display the measurements in the UI
  2. Act on the measurements (dosing, air valve, etc)
  3. Export the historical measurements

pH and DO can be two different plugins. Below, I'll use pH, but DO should like very similar.

Database table

CREATE TABLE IF NOT EXISTS pH_readings (
    experiment               TEXT NOT NULL,
    pioreactor_unit          TEXT NOT NULL,
    timestamp                TEXT NOT NULL,
    pH_reading               REAL NOT NULL
);

CREATE INDEX IF NOT EXISTS pH_measurements_ix
ON pH_measurements (experiment);

Chart yaml for UI

---
data_source: pH_readings # SQL table
data_source_column: ph_reading
title: pH 
mqtt_topic: pH_reading/pH
chart_key: ph_readings
source: pH_readings_plugin
y_axis_label: pH
interpolation: stepAfter
y_axis_domain: [6, 8]
lookback: 100000
fixed_decimals: 2

pH measurement reading job

# -*- coding: utf-8 -*-
import json
import click

from time import sleep
from pioreactor.whoami import get_unit_name, get_latest_experiment_name
from pioreactor.config import config
from pioreactor.background_jobs.base import BackgroundJobContrib
from pioreactor.utils import clamp
from pioreactor.utils.timing import RepeatedTimer
from pioreactor import hardware
from pioreactor.background_jobs.leader.mqtt_to_db_streaming import produce_metadata
from pioreactor.background_jobs.leader.mqtt_to_db_streaming import register_source_to_sink
from pioreactor.background_jobs.leader.mqtt_to_db_streaming import TopicToParserToTable

def __dir__():
    return ['click_pH_reading']

def parser(topic, payload) -> dict:
    metadata = produce_metadata(topic)
    return {
        "experiment": metadata.experiment,
        "pioreactor_unit": metadata.pioreactor_unit,
        "timestamp": timing.current_utc_timestamp(),
        "pH_reading": float(payload),
    }

register_source_to_sink(
    TopicToParserToTable(
        ["pioreactor/+/+/pH_reading/pH"],
        parser,
        "pH_readings",
    )
)

class PHReader(BackgroundJobContrib):

    job_name="pH_reading"
    published_settings = {
        "pH": {"datatype": "float", "settable": False},
    }

    def __init__(self, unit, experiment, **kwargs) -> None:
        super().__init__(unit=unit, experiment=experiment, **kwargs)

        time_between_readings = config.getfloat("pH_reading_config", "time_between_readings")
        assert time_between_readings >= 1.0

        self.i2c_channel = int(config.get("pH_reading_config", "i2c_channel_hex"), base=16)
        self.timer_thread = RepeatedTimer(time_between_readings, self.read_pH, job_name=self.job_name)
        self.i2c = busio.I2C(hardware.SCL, hardware.SDA)

    def read_pH(self):

        samples = 2
        running_sum = 0.0
        for _ in range(samples):
            running_sum += self._read_from_i2c()
            sleep(0.05)

        return running_sum/samples

    def on_ready_to_sleeping(self) -> None:
        self.timer_thread.pause()

    def on_sleeping_to_ready(self) -> None:
        self.timer_thread.unpause()

    def on_disconnect(self) -> None:
        self.timer_thread.cancel()

    def _read_from_i2c(self) -> pt.AnalogValue:
        result = bytearray(2)
        try:
            self.i2c.writeto_then_readfrom(
                self.i2c_channel, ??, result
            )
            return int.from_bytes(result, byteorder="little", signed=False)
        except OSError:
            raise exc.HardwareNotFoundError(
                f"Unable to find i2c channel {self.i2c_channel}."
            )

@click.command(name="pH_reading")
def click_pH_reading():
    """
    Start pH reading
    """

    job = PHReader(
        unit=get_unit_name(),
        experiment=get_latest_experiment_name(),
    )
    job.block_until_disconnected()

Dosing automation to target a pH

Open questions

  1. What is the frequency of measurements?