marcelm / radonwave

Read radon levels from Airthings Wave
MIT License
26 stars 8 forks source link

modfied code for FTLAB RadonEye RD200 #5

Open wettermann32 opened 5 years ago

wettermann32 commented 5 years ago

Maybe you can use it

#!/usr/bin/env python3
import sys
import time
from struct import unpack
from bluepy import btle
import time
from argparse import ArgumentParser
import json
import struct

def a_to_int(bytearray):
    return struct.unpack("l", bytearray)[0]

def a_to_float(bytearray):
    return struct.unpack("f", bytearray)[0]

class CouldNotConnectError(Exception):
    pass

class Measurement:
    def __init__(self, humidity, temperature, radon_avg, radon_1day, accel, brightness, humidity2):
        self.humidity = humidity
        self.temperature = temperature
        self.radon_avg = radon_avg
        self.radon_1day = radon_1day
        self.accel = accel
        self.brightness = brightness
        self.humidity2 = humidity2

def connect_and_read(device_address):

    try:
        dev = btle.Peripheral(device_address,"random")
    except btle.BTLEException as e:
        raise CouldNotConnectError()

    if False:
        print('Services')
        for service in dev.services:
            print(service)

        print('Characteristics')
        for ch in dev.getCharacteristics():
            print(ch.getHandle(), ch.uuid, ch.propertiesToString())

    service = dev.getServiceByUUID(btle.UUID('00001523-1212-efde-1523-785feabcd123'))
    character = dev.getCharacteristics(startHnd=1, endHnd=0xFFFF, uuid=btle.UUID('00001525-1212-efde-1523-785feabcd123'))
    dev.writeCharacteristic(11,b'\x50')
    value = dev.readCharacteristic(13)
    value2 = struct.pack('BBBB',value[2], value[3], value[4], value[5])
    value2 = int(a_to_float(value2)*37)

    dev.disconnect()

    return value2

def main():

    parser = ArgumentParser()

    parser.add_argument('--wait', default=1200, type=int,

        help='Seconds to wait between queries. Do not choose this too low as the '

        'radon levels are only updated once every 60 minutes. Set to 0 to query '

        'only once. Default: 1200 '

        '(20 minutes)')

    parser.add_argument('--mqtt', help='MQTT server')

    parser.add_argument('--topic', help='MQTT topic')

    parser.add_argument('device_address', metavar='BLUETOOTH-DEVICE-ADDRESS')

    args = parser.parse_args()

    device_address = args.device_address

    if args.mqtt and not args.topic:

        parser.error('Provide also a --topic when you use --mqtt')

    if args.mqtt:

        try:

            import paho.mqtt.client as mqtt

            client = mqtt.Client()

            client.connect(args.mqtt)

            assert client

        except Exception as e:  # unsure which exceptions connect can cause, so need to catch everything

            print('Could not connect to MQTT broker:', e, file=sys.stderr)

            client = None

    else:

        client = None

    while True:

        try:

            measurement = connect_and_read(device_address)

        except CouldNotConnectError:

            print('Could not connect', file=sys.stderr)
            print(device_address, file=sys.stderr)

        except btle.BTLEException as e:

            print('Bluetooth error:', e, file=sys.stderr)

        else:

            #print('{time}\t{radon:.2f}'.format(time=time.strftime('%Y-%m-%d %H:%M:%S'),measurement), sep='\t')
            print(time.strftime('%Y-%m-%d %H:%M:%S'), measurement, sep='\t')

            sys.stdout.flush()

        if client:

            data = {
                'radon': measurement
            }

            client.publish(args.topic, json.dumps(data))

        if args.wait == 0:

            break

        time.sleep(args.wait)

    if client:

        client.disconnect()

if __name__ == '__main__':

    main()
marcelm commented 5 years ago

Thanks a lot. I don’t know whether I can actually integrate it (because I cannot test it). But if nothing else, other people will now be able to find the code when they search.

wettermann32 commented 5 years ago

I can test it if you want to.

Best Michael

Am 19.07.2019 um 18:48 schrieb Marcel Martin:

Thanks a lot. I don’t know whether I can actually integrate it (because I cannot test it). But if nothing else, other people will now be able to find the code when they search.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/marcelm/radonwave/issues/5?email_source=notifications&email_token=AF34UHZ45PVVUZ6XHKOEGJ3QAHV4FA5CNFSM4IBTBAD2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD2MFBBA#issuecomment-513298564, or mute the thread https://github.com/notifications/unsubscribe-auth/AF34UHZXRH7UR67SCRLJATLQAHV4FANCNFSM4IBTBADQ.