AllanChain / blog

Blogging in GitHub issues. Building with Astro.
https://allanchain.github.io/blog/
MIT License
13 stars 0 forks source link

House air quality monitoring with Raspberry Pi and SGP30 #199

Open AllanChain opened 1 year ago

AllanChain commented 1 year ago

View Post on Blog

pi-sgp30

Connect SGP30 to your raspi and send data to InfluxDB and Grafana with Python. And then set up alerting in Grafana.


Note

Please make sure to read the previous post in this series (iot-sensors-setup) to set up Grafana and InfluxDB if you haven't done this.

TVOC and eCO2

SGP30 is a commonly used indoor air quality sensor for TVOC and eCO2 measurement. But what are TVOC and eCO2?

TVOC

TVOC is short for the total volatile organic compound. High TVOC doesn't necessarily mean that there's severe pollution, because it can also be high when there is a strong scent.

SGP30 gives TVOC results in the unit of ppb (part per million). To convert it to $\mathrm{\mu g/m^3}$, one can use the following formula:

$$ \rho{\mathrm{gas\ mix}}[\mathrm{\mu g / m^3]} = \frac{M{\mathrm{gas\ mix}}[\mathrm{g / mol}]}{Vm \times 1000 \mathrm{ppb}} \cdot c{\mathrm{gas\ mix}}[\mathrm{ppb}] $$

where $M_{\mathrm{gas\ mix}}$ is the average molar gas. In residential indoor environments, the typical value is $110\ \mathrm{g / mol}$. And $V_m \approx 0.0244 \mathrm{m^3/mol}$ is the molar volume. In short, as an estimate, one can use the following simplified formula:

$$ \rho{\mathrm{gas\ mix}}[\mathrm{\mu g / m^3]} = 4.5 \times c{\mathrm{gas\ mix}}[\mathrm{ppb}] $$

For more details, you can read this documentation on TVOC and IAQ. There's also a table showing the relationship between indoor air quality and TVOC:

IAQ and TVOC in light theme

eCO2

eCO2 means equivalent carbon dioxide. It's an estimate of CO2 based on VOCs using some proprietary algorithms, but unfortunately, it's not a good approximation. As pointed out by a post on Electronics360 and a post by Wolfgang Ewald, the main assumption for eCO2 is that people are the main driver of VOCs, and as TVOC increases, CO2 should increase. As soon as there is another source of VOC emission, this approximation fails.

As a consequence, you will observe the trend of eCO2 over the day is very similar to that of TVOC in your house, because the main source of pollution in a house is not people, but some activities like cooking. Therefore, eCO2 data is not very meaningful in this use case, and we will just focus on the TVOC part.

SGP30

SGP30 is a commonly used indoor air quality sensor with an I2C interface. It has a small size and low power consumption (48 mA at 1.8V), and features long-term stability. It can measure TVOC from 0 ppb to 60000 ppb with high resolution. You can find more on SGP30 by reading its datasheet. There might be some subtle difference between different manufacturers, but most of the data should be the same.

How does SGP30 measure TVOC? The metal-oxide gas sensor plays an important role. The metal oxide material is exposed to indoor air, and the sensor electronically measures the presence of reducing gases which are mainly VOCs^1. The reducing gases are compounds which react with atmospheric oxygen catalyzed on heated surfaces^2.

reducing gases in light theme

Then SGP30 is calibrated using ethanol since ethanol serves as a stable, reliable, and economic proxy for TVOC^3.

SGP30 value chain in light theme

The measurement process looks like this:

SGP30 measure process in light theme

You should first send a sgp30_iaq_init signal after every power-up or soft reset. Then it's optional to set a calibration baseline or absolute humidity. After that is the measurement loop and the sgp30_measure_iaq command must be sent with an interval of 1s for the built-in dynamic compensation algorithm to work. You should also save the baselines returned by sgp30_get_iaq_baseline so that you can use sgp30_set_iaq_baseline on power-up and don't have to wait a long time to get a good baseline.

Wiring

Raspberry Pi pins 3 (GPIO 2, SDA, data) and 5 (GPIO 3, SCL, clock) are special pins to handle I2C data. You can connect as many sensors or devices as you want to these two pins as long as their I2C address are not in conflict, which is usually built into the chip.

raspi gpio in light theme

Of course it's simpler to connect VCC to pin 1 (3.3V power), but I'm connecting to 5V power because the current is limited for the 3.3V power pin, while the 5V power is unlimited and I can add more sensors in the future^4. However, 3.3V should be used for I2C communication^5. You can of course use two resistors to achieve the conversion, but I don't have resistors at hand, so the simplest solution is to connect three diodes in series in the circuit.

raspi wiring in light theme

Setup I2C in Raspberry Pi

To use I2C in Raspi, you need to enable the feature first. Just run

sudo raspi-config

Select "Interface Options" and enable I2C and you are done.

To verify that the SGP30 is recognized by Raspi, you can first install i2c-tools:

sudo apt-get install i2c-tools

And then run

sudo i2cdetect -y 1

If things are going well, you will see something like this:

     0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f
00:          -- -- -- -- -- -- -- -- -- -- -- -- -- 
10: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
20: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
30: -- -- -- -- -- -- -- -- 38 -- -- -- -- -- -- -- 
40: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
50: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
60: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
70: -- -- -- -- -- -- -- --                        

Grafana live

Since we have to measure the air quality every second, we can build a live dashboard using Grafana live.

Go to "Administration" > "Service accounts" > "Create service account" to add a new service account with permission "Admin".

add service account

Then click "Add Service Account Token" to add a new token. That will be used for pushing the updates in the code below.

The code

Here is an example code for:

from abc import ABC, abstractmethod
import time
import traceback
from typing import TypedDict

import schedule
import requests
from sgp30 import SGP30

class ReportData(TypedDict):
    name: str
    value: str
    sensor: str

CONFIG = {
    "debug": True,
    "report": {
        "influx": "http://127.0.0.1:8086/write?db=sensors",
        "grafana": "http://127.0.0.1:3000/api/live/push/live-sensors",
    },
    "query": "http://127.0.0.1:8086/query",
    "authorization": {
        "grafana": "glsa_<token here>",
    },
}

def format_report_data(data: list[ReportData]) -> str:
    db_data = ""
    timestamp = int(time.time() * 1e9)
    for d in data:
        db_data += f"{d['name']},sensor={d['sensor']} value={d['value']} {timestamp}"
        db_data += "\n"
    db_data = db_data.strip()
    return db_data

class SensorTask(ABC):
    is_live = False

    def __init__(self) -> None:
        super().__init__()
        self.error_count = 0

    @abstractmethod
    def read(self) -> list[ReportData]:
        ...

    def report(self, data_str: str) -> None:
        requests.post(CONFIG["report"]["influx"], data=data_str)
        if self.is_live:
            requests.post(
                CONFIG["report"]["grafana"],
                data=data_str,
                headers={
                    "Authorization": f"Bearer {CONFIG['authorization']['grafana']}"
                },
            )

    def task(self):
        try:
            data = self.read()
            if not data:
                return
            data_str = format_report_data(data)
            if CONFIG["debug"]:
                print(data_str)
            else:
                self.report(data_str)
            self.error_count = 0  # reset error count
        except Exception:
            self.error_count += 1
            if self.error_count == 3:
                self.report_error(cancel=True)
                print("Cancelling", self.__class__.__name__)
                return schedule.CancelJob

class SensorSGP30(SensorTask):
    is_live = True
    warming_up: int
    "Warming up data point count. negative for warmed up"

    def __init__(self) -> None:
        super().__init__()
        self.sgp30 = SGP30()
        self.warming_up = 0
        try:
            resp = requests.get(
                CONFIG["query"],
                params={
                    "db": "sensors",
                    "q": (
                        'SELECT last("value") FROM "eco2-baseline";'
                        'SELECT last("value") FROM "tvoc-baseline"'
                    ),
                },
            )
            results = resp.json()["results"]
            baseline = list(results[i]["series"][0]["values"][0][1] for i in range(2))
        except Exception:
            traceback.print_exc()
            print("Failed to get the previous baseline")
            baseline = None
        finally:
            self.sgp30.command("init_air_quality")
            if baseline is not None:
                self.sgp30.set_baseline(*baseline)

        self.baseline_clock = 0

    def read(self) -> list[ReportData]:
        eco2, tvoc = self.sgp30.command("measure_air_quality")
        if self.warming_up >= 0:
            # maxium try 20
            if eco2 == 400 and tvoc == 0 and self.warming_up < 20:
                self.warming_up += 1
                return []
            self.warming_up = -1

        data = [
            {"name": "eco2", "value": f"{eco2:d}", "sensor": "SGP30"},
            {"name": "tvoc", "value": f"{tvoc:d}", "sensor": "SGP30"},
        ]
        self.baseline_clock += 1
        if self.baseline_clock == 60:
            self.baseline_clock = 0
            eco2, tvoc = self.sgp30.command("get_baseline")
            data.extend(
                [
                    {"name": "eco2-baseline", "value": f"{eco2:d}", "sensor": "SGP30"},
                    {"name": "tvoc-baseline", "value": f"{tvoc:d}", "sensor": "SGP30"},
                ]
            )

        return data

sgp30_sensor = SensorSGP30()
schedule.every().second.do(sgp30_sensor.task)

schedule.run_all()

while True:
    n = schedule.idle_seconds()
    if n > 0:  # sleep exactly the right amount of time
        time.sleep(n)
    schedule.run_pending()

There are many things to improve, such as more robust error handling for connection failure, using websockets for Grafana to reduce CPU load, etc. But those are beyond the scope of a blog post. I may open source the full codebase sometime in the future, and let me know if you need it.

More on warmup and baselines

SGP30 has a warmup phase, i.e. for the first 15s the sgp30_measure_iaq command will always return fixed values of 400 ppm eCO2 and 0 ppb TVOC. However, even after the first 15s, SGP30 can still return 400 ppm eCO2 and 0 ppb TVOC for a long time. That's because it's finding a good baseline. It can take as long as one day for a good baseline to be found and the values become meaningful. Therefore, it's important that you save the baseline and restore them after sgp30_iaq_init.

However, it doesn't mean that you just need one baseline value, because the baseline is constantly changing over time:

SGP30 baseline over time

Building Grafana dashboard

Things are pretty the same as the previous post in this series (iot-sensors-setup), except for that we are creating a live one. We need to select --Grafana-- as the datasource, and choose "Live Measurements" and the right channel:

create live dashboard

Setup alert

Go to "Alerting" > "Contact points" > "New contact point" and add a new contact point based on your preference.

Then edist the default policy in "Notification Policies" to use your newly created contact point.

edit default policy

Then in "Alert rules", click "Create alert rule", and set up the query for TVOC:

create alert rule

Then set the folder and evaluation group.

Then add a summary annotation for the alert using go template. An example will be

{{ if $values.B }}{{ if eq $values.C.Value 0.0 }}Resolve{{ end }}TVOC alert{{ else }}TVOC no data!{{ end }}

You can find more functions here.

murenma05 commented 10 months ago

Thanks a lot, this is really helpful for me!😋