The lightweight, flexible, configurable and highly extensible framework for IoT devices (like raspberry pi ) that allows for capturing sensor data (and potentially other devices) and consuming it in different ways including sending it to the RADAR-base platform backend. The framework is highly decoupled and extensible. There are 4 major components in the framework -
The only external dependency is a pub/sub broker or messaging queue that can be easily run using docker images either on the IoT device or elsewhere(like the cloud).
Here is it's architecture. The data flow is from left to right. To know more about each component please take a look at the Configuration section
.───────────.
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ( )
│`───────────'│
│ Device Handlers │ Configuration│
│ │
│ │ │.───────────.│
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ( )
▲ `───────────'
┌──────────────────┴──────────────────────────┐
│ │
┌────────────────────────────────────────────────┐ ┌──────────────────────────────────┐ ┌──────────────────────────────────┐ ┌────────────────────────────────────────────┐ ┌─────┐ ┌──────────────────────────────────────────────────────────────────────┐
│ │ │ │ │ │ │ │ │ │ │ │
│ Sensor Handler │ │ Other Handlers... │ │ Converter │ │ Publisher │ │ │ │ Data Consumer │
│ │ │ │ │ │ │ │ │ │ │ │
└────────────────────────────────────────────────┘ └──────────────────────────────────┘ └──────────────────────────────────┘ └────────────────────────────────────────────┘ │ │ └──────────────────────────────────────────────────────────────────────┘
╔════════════════════════════════════════════════╗ ╔══════════════════════════════════╗ ┌──────────────────────────────────┐ ┌────────────────────────────────────────────┐ │ │ ╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳
║ ║ ║ ║ │ │ │ │ │ │ ╳ ┌──────────────────────────────────────────────────────────────────┐ ╳
║ ║ ║ ║ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ ╳ │ Essentially the data consumer can be any application or system │ ╳
║ ║ ║ ║ │ │ │ │ │ │ │ │ ╳ │that can subscribe to topics/channels on the pub/sub system. This │ ╳
║ ║ ║ ║ │ │ Message Converter │ │ │ │ │ │ ╳ │ makes it language agnostic and helps connect external systems. │ ╳
║ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ║ ║ ┌───────────────────────┐ ║ │ Abstract Base class for │ │ │ Publisher │ │ │ │ ╳ └──────────────────────────────────────────────────────────────────┘ ╳
║ Sensor ║ ║ │ │ ║ │ │ serialisation and validation │ │ │ Abstract Base class for publishing │ │ │ ╳ ┌──────────────────────────────────────────────────────────────────┐ ╳
║ │Abstract Base class. Has implementation for│ ║ ║ │ │ ║ │ of data. │ │ │ records to the pub/sub broker or system │ │ │ │ ╳ │ Here we provide a general model which should be used when │ ╳
║ polling and flushing the data. The actual ║ ║ │ │ ║ │ │ │ │ │ │ │ │ ╳ │ developing such data consumer to provide flexibility and │ ╳
║ │ data is provided by the subclasses │ ║ ║ │ │ ║ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ │ ╳ │ extensibility in line with the other parts of the system. │ ╳
║ ║ ║ │ │ ║ │ ▲ │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ ╳ └──────────────────────────────────────────────────────────────────┘ ╳
║ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ║ ║ │ Handler Specific │ ║ │ extends──┴──extends┐ │ │ ▲ │ │ P │ ╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳╳
║ ▲ ║ ║ │ Implementations │ ║ │ │ │ │ │ ┌extends─────┬─┴──────extends─┐ │ │ u │ ┌──────────────────────────────────────────────────────────────────────┐
║ ┌────extends───┬┴─────extends────┐ ║ ║ │ │ ║ │ │ │ │ │ │ │ │ │ │ b │ │ ┌──────────────────────────────────────────────────────────────────┐ │
║ │ │ │ ║ ║ │ │ ║ │┌────────────────┐ ┌───────────┐│ │ │ │ │ │ │ l │ │ │ Data Handler │ │
║ │ │ │ ║ ║ │ │ ║ ││ │ │ ││ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ i │ │ │ │ │
║┌─────────────┐┌─────────────┐ ┌─────────────┐║ ║ │ │ ║ ││ │ │ Other ││ │ │ │ │ │ │ │ │ │ s │ │ └──────────────────────────────────────────────────────────────────┘ │
║│ ││ │ │ │║ ║ │ │ ║ ││ AvroConverter │ │ Converter ││ │ │ Redis │ │ MQTT │ │ Other │ │ │ h │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
║│ Sensor 1 ││ Sensor 2 │ │ Sensor N │║ ║ └───────────────────────┘ ║ ││ │ │ ││ │ │Publisher│ │Publisher│ │Publisher│ │ │ e │ │ │ │
║│fun get_data ││fun get_data │ │fun get_data │║ ║ ║ ││ │ │ ││ │ │ │ │ │..... │ │ │ │ r │ │ │ │
║│ ││ │ │ │║ ║ ║ │└────────────────┘ └───────────┘│ │ └─────────┘ └─────────┘ └─────────┘ │ │ / │ │ Data Consumer │ │
║└─────────────┘└─────────────┘ └─────────────┘║ ║ ║ │ △ △ │ │ △ △ △ │ │ S │ │ │ Abstract base class for consuming the data. The actual ◁─┐ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ u │ │ processing of the consumed data is done by subclasses │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ b │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ s │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ c │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ r │ │ ▲ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ i │ │ ┌──extends───────┬──────┴──extends────────┐ │ │
║ ║ ║ ║ │ └───────┬─────────┘ │ │ │ │ │ │ │ b │ │ │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ e │ │ ┌───────────┐ ┌──────────────┐ ┌──────────────┐ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ b │ │ │Radar Data │ │Visualisation │ │ Mobile App │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ r │ │ │ Consumer │ │Data Consumer │ │ Consumer │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ o │ │ │ │ │ │ ...... │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ k │ │ └───────────┘ └──────────────┘ └──────────────┘ │ │
║ ║ ║ ║ │ │ │ │ △ △ △ │ │ e │ │ ▣ ▣ ▣ │ │
║ ║ ║ ║ │ △ │ │ ┌──────────┐ ┌──────────┐ ┌───────────┐│ │ r │ │ └────────────────┴────┬───────────────────┘ │ │
║ ║ ║ ║ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ │ │ │ │ ││ │ │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ Redis │ │ MQTT │ │ Other ││ │ o │ │ ◎ │ │
║ ║ ║ ║ │ │ │ │ │Connection│ │Connection│ │Connection ││ │ r │ │ .─────────────────────────────────────────────. │ │
║ ║ ║ ║ │ Schema Retriever │ │ │ │ │ │ │.....│ ││ │ │ │ ( External Services ) │ │
║ ║ ║ ║ │ │ │ │ └──────────┘ └──────────┘ └───────────┘│ │ s │ │ `─────────────────────────────────────────────' │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ y │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ s │ │ │ │
║ ║ ║ ║ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ └─extends────┴─┬────extends───┘ │ │ t │ │ │ │
║ ║ ║ ║ │ ▲ │ │ │ │ │ e │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │
║ ║ ║ ║ │ ┌─extends──┴┬───extends┐ │ │ ▼ │ │ m │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ │ │ │ │
║ ║ ║ ║ │ ┌────────┐ ┌────────┐ ┌────────┐│ │ │ │ │ │ │ Subscriber │ │ │
║ ║ ║ ║ │ │ │ │ │ │ ││ │ │ │ │ │ │ │Abstract base class for subscribing ▷──────────────┘ │
║ ║ ║ ║ │ │ │ │ │ │ ││ │ Connection │ │ │ │ │ to the pub/sub system │ │
║ ║ ║ ║ │ │ File │ │ URL │ │ Schema ││ │ │ Represents a connection to the pub/sub │ │ │ │ │ │
║ ║ ║ ║ │ │ System │ │ │ │Registry││ │ system │ │ │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ ││ │ │ │ │ │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
║ ║ ║ ║ │ │ │ │ │ │ ││ │ │ │ │ │ │ ▲ │
║ ║ ║ ║ │ └────────┘ └────────┘ └────────┘│ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ │ ┌──extends───────┴──┬─────extends────┐ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ │ │ │
║ ║ ║ ║ │ │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
║ ║ ║ ║ │ │ │ │ │ │ │ │ Redis │ │ MQTT │ │ Other │ │
║ ║ ║ ║ │ │ │ │ │ │ │ │Subscriber│ │Subscriber│ │Subscriber│ │
║ ║ ║ ║ │ │ │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ │
║ ║ ║ ║ │ │ │ │ │ │ │ │
╚════════════════════════════════════════════════╝ ╚══════════════════════════════════╝ └──────────────────────────────────┘ └────────────────────────────────────────────┘ └─────┘ └──────────────────────────────────────────────────────────────────────┘
The template for configuration is located at config.yaml.template. Copy this to the config.yaml
and modify as required.
Currently, the configuration can be divided in to 4 main components. Each of the components has some sensible defaults but it is recommended to understand this section thoroughly.
Sensors: Represented by the key sensors
in the config file consists of an array of sensor configurations.
Each sensor is configured as follows -
- name: "your-sensor-name"
# Name of your python module which contains the sensor class
module: "your_package.your_module"
# Name of the class of the sensor in the module
class: "YourSensorClass"
# topic/channel to publish the data to in pub/sub paradigm
publishing_topic: "your-sensor-topic"
# polling frequency in milliseconds
poll_frequency_ms: 1000
# Flush size for flushing the records
flush_size: 100
# Flush after [value] seconds if flush size is not reached
flush_after_s: 1000
Currently, implementations for the following sensors are provided -
By Default , No Sensors are added to the Configuration. This is because it is hardware dependent and thus we cannot have a default sensor config. If sensor config is not provided, the program will fail with an exception.
Converters: Represented by the key converter
in the config file. This is for Serialisation of the messages captured by the sensors and before publishing them. It comprises of the following fields -
converter:
name: 'avro'
module: 'commons.message_converter'
class: 'AvroConverter'
validate_only: True
schema_retriever:
module: 'commons.schema'
class: 'FileAvroSchemaRetriever'
args:
filepath: '/base/path/to/the/schemas'
extension: '.avsc'
validate_only
is True
, then no conversion of the message is performed. It is only validated against the schema.None
, so the messages are neither serialised nor validated unless explicitly configured.Schema Retrievers: These are used for retrieving schemas to be used for validation and serialisation.
args
key under schema_retriever
. For example, the FileAvroSchemaRetriever
needs a base path where all schemas are stored in the filesystem(filepath
) and an extension of the files(extension
) as shown in the above exampleSensorBasedSchemaNamingStrategy
in the commons.schema module.Publisher: This is for configuring the pubsub broker/messaging queue for sending the captured sensor data. This provides extensibility to the framework so that other services, devices, etc can also capture the data which are outside of this framework. Some examples include mobile applications (like RADAR-base passive RMT), language agnostic data consumers (like using your existing libraries in other language like java to consume the messages), etc. MQTT is a widely used pubsub protocol for IoT devices and frameworks. Redis pubsub is another lightweight pub/sub system and perform well on ARM architectures(like Raspberry PI) too. Plus it provides other features which maybe useful in the future. Hence, there is an out of the box implementation for publishing the data to Redis Pub/Sub but can easily add MQTT(see the Extending section below). It can configured as follows-
publisher:
module: 'pubsub.redis_publisher'
class: 'RedisPublisher'
connection:
module: 'pubsub.redis_connection'
class: 'RedisConnection'
host: 'localhost'
port: '6379'
password: ''
publisher_max_threads: 5
Most of the options are self-explanatory. The publisher_max_threads
is the max workers for the ThreadPoolExecutor used for publishing messages. If you have large number of sensors and experience slow performance, then increasing the number of Threads for publisher may help.
Others: Other config options include the following -
expose_config_endpoint: True
root_logger_level: INFO
scheduler_max_threads: 10
expose_config_endpoint
, if set to True
, exposes an http endpoint for getting the config of the system. Could be useful if other systems need to use this config.scheduler_max_threads
is the max workers for the ThreadPoolExecutor used by the scheduler for polling sensor data. Increasing this may be beneficial if using a large number of sensors.root_logger_level
define the log level for the root
logger which is used across the whole application.
Other than the defaults and the config.yaml
file, The framework also supports configuration using the OS Environment Variables. For using these, the keys used in the above config files needs to be prefixed with radar_iot_
. This is done to ensure the names never conflict with other environment variables.
For example, to set the root_logger_level
value using environment variables, use -
radar_iot_root_logger_level=INFO
With so many config options, it's important to clarify the order in which the configuration is read. It is as follows-
config.yaml
file.So in descending order of precedence,
config.yaml > OS environment vars > default
The Configuration is validated against the json-schema spec at configspec
Pre-requisites:
To install the dependencies, navigate to the root folder of the run the following-
python3 -m pip install -r
Then, run the program using -
python3 main.py
Note: If using any of the grove pi hat and modules, you will need to install GrovePi library separately using the command below -
sudo bash scripts/install_grovepi.sh
For more info, look at the GrovePi official docs
The framework is decoupled and it is easy to extend different components. In particular, there are 4 major components in the framework that can be easily extended -
Depending on which part of the framework is being extended, the requirements will vary.
sensor.Sensor
abstract base class (ABC) and implementing the appropriate abstract functions.config.yaml
file as specified in the Configuration section above.For example, following is implementation of a test sensor. You just need to specify the get_data()
method and the rest will be taken care by the framework.
import logging
from sensors import Sensor
logger = logging.getLogger('root')
class YourTestSensor(Sensor):
def __init__(self, name, topic, poll_freq_ms, flush_size, flush_after_s):
super().__init__(name, topic, poll_freq_ms, flush_size, flush_after_s)
def get_data(self):
logger.debug('test data')
# Your logic for getting data from the sensor
return 35.7
Remember to pass all the required constructor values to the super class. Also notice the use of the root logger which was discussed earlier in the configuration section.
Additionally, you can also extend other methods of sensor used for polling, flushing, etc. These can be found in the super class Sensor
For already available sensor implementations, take a look at various sensors in the sensors package.
This can be extended by extending the Publisher
abstract class in pubsub.publisher module.
You will need to provide implementation of the _publish
method which will handle all the logic of publishing the messages. Ideally these should also convert the messages before publishing.
Take a look at RedisPublisher
in pubsub.redis_publisher module for an example implementation.
Just extend the MessageConverter
abstract class in commons.message_converter module.
You will need to provide implementation for convert
(for a single message) and convert_all
(for a list of messages) methods.
Take a look at AvroValidatedJsonConverter
in the same module for an example implementation.
Please read the data uploader module documentation. This is currently provided in Kotlin programming language but can be created in any language desired (which has clients for subscribing to the pub/sub system).
There are 3 types of schema retrievers provided which are located in the schema module -
FileAvroSchemaRetriever: This will retrieve the schemas from the local filesystem. It can configured as follows-
schema_retriever:
module: 'commons.schema'
class: 'FileAvroSchemaRetriever'
args:
filepath: '/etc/schemas/avro/sensors'
extension: '.avsc'
This will walk down the path in the filepath
specified and load all the files with the extension
provided.
The example Avro schemas are located in the /etc/schemas/avro/sensors
GithubAvroSchemaRetriever: This will retrieve the schemas from a Github Repository. It can configured as follows-
schema_retriever:
module: 'commons.schema'
class: 'GithubAvroSchemaRetriever'
args:
repo_owner: 'RADAR-base'
repo_name: 'RADAR-Schemas'
branch: 'sensors'
basepath: 'commons/iot/sensor'
extension: '.avsc'
git_user: 'username'
git_password: '*******'
token: ''
Like the file retriever, this will also walk down the basepath
in the repository and retrieve schemas with the extension
provided.
If the repository is public, there is no requirement to specify the git_user
and git_password
but it is still recommended as it increases the Github Api limits. You can also provide a token
(personal access token) inplace of username/password for better security.
SchemaRegistrySchemaRetriever: This will retrieve schemas from the Confluent Schema Registry. It can be configured as follows. The schema_registry_url
is a required argument.
schema_retriever:
module: 'commons.schema'
class: 'SchemaRegistrySchemaRetriever'
args:
schema_registry_url: 'https://radar-cns-platform.rosalind.kcl.ac.uk/schema'