Open Benjamin-Griffiths opened 1 month ago
Hey! Thanks a lot for usgin pglive :).
Yes, LiveAxisRange()
uses number of points instead of time to roll. One can convert ticks to time if you have a constant stream of data. 100Hz = 100
points per second, so you would set roll_on_tick = 100
, meaning roll every 1 second. Or in your case of windows size = 30, it would be 30 * 100.
I will think about how to implement rolling on time :).
Also you want to sync all updates to fastes axis, you can use ignore_auto_range
attribute for that.
For now try this example (with ignore_auto_range
implemented). Hopefully it will help you a bit:
import random
import sys
from datetime import datetime
import pyqtgraph as pg
from pglive.sources.data_connector import DataConnector
from pglive.sources.live_axis import Axis, LiveAxis
from pglive.sources.live_axis_range import LiveAxisRange
from pglive.sources.live_plot import LiveLinePlot
from pglive.sources.live_plot_widget import LiveAxis, LivePlotWidget, ViewBox
from PyQt6.QtCore import QTimer
from PyQt6.QtWidgets import QApplication, QGraphicsGridLayout
class LiveChart(LivePlotWidget):
def __init__(self, data_rate=100):
self._window: float = 30.0
# Calculate our tick_size as fastest data_rate (Hz) * window size (Sec)
super().__init__(x_range_controller=LiveAxisRange(roll_on_tick=int(data_rate * self._window)))
self.plt = self.plotItem
self.plt.vb.sigResized.connect(self.update_views)
# get plot grid layout
self.plt_lay: QGraphicsGridLayout = self.plt.layout
self.plt.hideAxis("left")
self.plt.hideAxis("right")
self.plt.hideAxis("top")
# remove left axis
self.plt_lay.removeItem(self.plt_lay.itemAt(2, 0))
# add grid layout in place of original left axis to add all vertical axes to
self.ax_lay = pg.GraphicsLayout()
self.plt_lay.addItem(self.ax_lay, 2, 0)
# time axis
self.time_axis = LiveAxis("bottom", text="Datetime", **{Axis.TICK_FORMAT: Axis.TIME})
self.plt.setAxisItems({"bottom": self.time_axis})
self.axes: dict[str, LiveAxis] = {}
self.view_boxes: dict[str, ViewBox] = {}
self.series: dict[str, LiveLinePlot] = {}
self.connectors: dict[str, DataConnector] = {}
self._enabled: bool = True
def add_axis(self, name: str) -> None:
"""
Adds axis to the chart.
:param name: name of the axis
"""
# axis with name already exists
if name in self.axes:
return
vb = ViewBox()
self.view_boxes[name] = vb
ax = LiveAxis("left")
self.axes[name] = ax
self.ax_lay.addItem(ax)
self.plt.scene().addItem(vb)
ax.linkToView(vb)
vb.setXLink(self.plt)
ax.setZValue(-10000)
ax.setLabel(name)
def add_series(self, name: str, axis_name: str, color: str, data_rate: int, ignore_auto_range=True) -> None:
"""
Adds line series to the chart.
:param name: name of the series
:param axis_name: name of the vertical axis
:param data_rate: data_rate in Hz
:param ignore_auto_range: flag indicating major or minor axis, major should be only one axis among all!
"""
series = LiveLinePlot(pen=color)
self.series[name] = series
self.addItem(series)
# We need to calculate max_points to match our data window
# Windows size is in seconds, rate in Hz, so number of points is window_size * data_rate
max_points = self._window * data_rate
connector = DataConnector(series, max_points=max_points, ignore_auto_range=ignore_auto_range)
self.connectors[name] = connector
vb = self.view_boxes[axis_name]
vb.addItem(series)
def add_value(self, name: str, value: float, timestamp: datetime) -> None:
"""
Adds a new value to the chart.
:param name: name of the series to add the value to
:param value: value to add
:param timestamp: datetime object
"""
if name not in self.connectors:
return
connector = self.connectors[name]
connector.cb_append_data_point(value, timestamp.timestamp())
def update_views(self):
# view has resized; update auxiliary views to match
for vb in self.view_boxes.values():
# view has resized; update auxiliary views to match
vb.setGeometry(self.plt.vb.sceneBoundingRect())
# need to re-update linked axes since this was called
# incorrectly while views had different shapes.
# (probably this should be handled in ViewBox.resizeEvent)
vb.linkedViewChanged(self.plt.vb, vb.XAxis)
if __name__ == "__main__":
app = QApplication(sys.argv)
chart = LiveChart(data_rate=100)
chart.show()
# add axes
chart.add_axis("Speed (RPM)")
chart.add_axis("Torque (Nm)")
# add series
# This is our Major axis
chart.add_series("Speed Sensor 1", "Speed (RPM)", "red", data_rate=100, ignore_auto_range=False)
# Rest is updated only when Major axis is updated by setting ignore_auto_range = True
chart.add_series("Speed Sensor 2", "Speed (RPM)", "green", data_rate=100, ignore_auto_range=True)
chart.add_series("Torque Sensor 1", "Torque (Nm)", "blue", data_rate=10, ignore_auto_range=True)
chart.add_series("Torque Sensor 2", "Torque (Nm)", "yellow", data_rate=10, ignore_auto_range=True)
# speed measurements coming in at 100Hz
timer_1 = QTimer()
timer_1.setInterval(10)
timer_1.timeout.connect(
lambda: chart.add_value("Speed Sensor 1", random.randint(200, 250), datetime.now().astimezone())
)
timer_1.timeout.connect(
lambda: chart.add_value("Speed Sensor 2", random.randint(250, 300), datetime.now().astimezone())
)
# torque measurements coming in at 10Hz
timer_2 = QTimer()
timer_2.setInterval(100)
timer_2.timeout.connect(
lambda: chart.add_value("Torque Sensor 1", random.randint(1, 5), datetime.now().astimezone())
)
timer_2.timeout.connect(
lambda: chart.add_value("Torque Sensor 2", random.randint(2, 4), datetime.now().astimezone())
)
timer_1.start()
timer_2.start()
sys.exit(app.exec())
Hey - thank you for the fast response and the modified example! I appreciate it!
Using your advice on the ignore_auto_range
argument, I have modified the example to include "dummy" plot which is invisible and has a QTimer add data at 30Hz. The data connector for this dummy plot is ignore_auto_range=False
, and all added series are ignore_auto_range=True
.
This allows the time axis range to be independent of the sensor data sample rate.
I have then reimplemented the LivePlotWidget
get_x_range
method to return a range based on the preferred time window, rather than ticks.
This implementation works well for me so far - but I'm interested to see how you would implement this type of feature if you do in the future.
Here is an example with some widgets to resume/pause, set the time window, toggle "roll" and autorange:
import random
import sys
from datetime import datetime
from typing import Union
import pyqtgraph as pg
from pglive.sources.data_connector import DataConnector
from pglive.sources.live_axis import Axis, LiveAxis
from pglive.sources.live_plot import LiveLinePlot
from pglive.sources.live_plot_widget import LiveAxis, LivePlotWidget, ViewBox
from PyQt6.QtCore import Qt, QTimer
from PyQt6.QtGui import QPen
from PyQt6.QtWidgets import (
QApplication,
QCheckBox,
QDoubleSpinBox,
QGraphicsGridLayout,
QGridLayout,
QPushButton,
QWidget,
)
class LiveChart(LivePlotWidget):
def __init__(self, window: float = 10.0, fps: int = 30, roll_enabled: bool = False):
super().__init__()
# reimplement LivePlotWidget get_x_range method
def get_x_range(data_connector, tick: int):
now = datetime.now().astimezone().timestamp()
if self._roll_enabled:
return [now - self._window, now]
else:
x_range_start = self.final_x_range[0]
if now > x_range_start + self._window:
return [now, now + self._window]
else:
return [x_range_start, x_range_start + self._window]
# use reimplemented function for setting x range based on time and not ticks
self.x_range_controller.get_x_range = get_x_range
self._window: float = window
self._fps: int = fps
self._roll_enabled: bool = roll_enabled
self.plt = self.plotItem
self.plt.vb.sigResized.connect(self.update_views)
self.plt_lay: QGraphicsGridLayout = self.plt.layout
self.plt.hideAxis("left")
self.plt.hideAxis("right")
self.plt.hideAxis("top")
self.plt_lay.removeItem(self.plt_lay.itemAt(2, 0)) # remove left axis
# add layout in place of original left axis for all vertical axes
self.ax_lay = pg.GraphicsLayout()
self.plt_lay.addItem(self.ax_lay, 2, 0)
# time axis
self.time_axis = LiveAxis(
"bottom", text="Datetime", **{Axis.TICK_FORMAT: Axis.TIME}
)
self.plt.setAxisItems({"bottom": self.time_axis})
self.axes: dict[str, LiveAxis] = {}
self.view_boxes: dict[str, ViewBox] = {}
self.series: dict[str, LiveLinePlot] = {}
self.connectors: dict[str, DataConnector] = {}
# dummy series and axis for auto scaling time axis range
pen = QPen()
pen.setStyle(Qt.PenStyle.NoPen)
self._auto_range_series = LiveLinePlot(pen=pen)
self.addItem(self._auto_range_series)
self._auto_range_connector = DataConnector(
self._auto_range_series,
max_points=2,
ignore_auto_range=False,
)
self._auto_range_timer = QTimer()
self._auto_range_timer.setTimerType(Qt.TimerType.PreciseTimer)
self._auto_range_timer.setInterval(int(1000 / self._fps))
self._auto_range_timer.timeout.connect(
lambda: self._auto_range_connector.cb_append_data_point(
0, datetime.now().astimezone().timestamp()
)
)
self.set_window(self._window)
self.set_roll_enabled(self._roll_enabled)
self._auto_range_timer.start()
def add_axis(self, name: str) -> None:
"""
Adds axis to the chart.
:param name: name of the axis
"""
# axis with name already exists
if name in self.axes:
return
vb = ViewBox()
self.view_boxes[name] = vb
ax = LiveAxis("left")
self.axes[name] = ax
self.ax_lay.addItem(ax)
self.plt.scene().addItem(vb)
ax.linkToView(vb)
vb.setXLink(self.plt)
ax.setZValue(-10000)
ax.setLabel(name)
def add_series(self, name: str, axis_name: str, color: Union[str, QPen]) -> None:
"""
Adds line series to the chart.
:param name: name of the series
:param axis_name: name of the vertical axis
"""
# remove existing series with same name
if name in self.series:
self.remove_series(name)
series = LiveLinePlot(pen=color)
self.series[name] = series
self.addItem(series)
connector = DataConnector(
series, max_points=6000, plot_rate=self._fps, ignore_auto_range=True
)
self.connectors[name] = connector
vb = self.view_boxes[axis_name]
vb.addItem(series)
def set_window(self, window: Union[int, float]) -> None:
"""
Sets the scrolling time window shown on the chart in seconds.
:param window: time window in seconds
"""
self._window = float(window)
def set_roll_enabled(self, enabled: bool) -> None:
""" """
self._roll_enabled = enabled
def autoscale(self) -> None:
""" """
self.auto_btn_clicked()
def add_value(self, name: str, value: float, timestamp: datetime) -> None:
"""
Adds a new value to the chart.
:param name: name of the series to add the value to
:param value: value to add
:param timestamp: datetime object
"""
if name not in self.connectors:
return
connector = self.connectors[name]
connector.cb_append_data_point(value, timestamp.timestamp())
def pause(self) -> None:
""" """
self._auto_range_connector.pause()
for c in self.connectors.values():
c.pause()
def resume(self) -> None:
""" """
self._auto_range_connector.resume()
for c in self.connectors.values():
c.resume()
def update_views(self):
# view has resized; update auxiliary views to match
for vb in self.view_boxes.values():
# view has resized; update auxiliary views to match
vb.setGeometry(self.plt.vb.sceneBoundingRect())
# need to re-update linked axes since this was called
# incorrectly while views had different shapes.
# (probably this should be handled in ViewBox.resizeEvent)
vb.linkedViewChanged(self.plt.vb, vb.XAxis)
if __name__ == "__main__":
app = QApplication(sys.argv)
chart = LiveChart()
widget = QWidget()
layout = QGridLayout(widget)
# resume/pause
resume_btn = QPushButton("Resume")
resume_btn.clicked.connect(chart.resume)
layout.addWidget(resume_btn, 0, 0)
pause_btn = QPushButton("Pause")
pause_btn.clicked.connect(chart.pause)
layout.addWidget(pause_btn, 0, layout.columnCount())
# time window
window_spinbox = QDoubleSpinBox()
window_spinbox.setDecimals(1)
window_spinbox.setRange(0.0, 60.0)
window_spinbox.setValue(chart._window)
window_spinbox.valueChanged.connect(chart.set_window)
layout.addWidget(window_spinbox, 0, layout.columnCount())
# roll enabled
roll_toggle = QCheckBox("Roll")
roll_toggle.stateChanged.connect(
lambda state: chart.set_roll_enabled(state == Qt.CheckState.Checked.value)
)
layout.addWidget(roll_toggle, 0, layout.columnCount())
# autoscale
autoscale_btn = QPushButton("Autoscale")
autoscale_btn.clicked.connect(chart.autoscale)
layout.addWidget(autoscale_btn, 0, layout.columnCount())
layout.addWidget(chart, 1, 0, 1, layout.columnCount())
# add axes
chart.add_axis("Speed (RPM)")
chart.add_axis("Torque (Nm)")
# add series
chart.add_series("Speed Sensor 1", "Speed (RPM)", "red")
chart.add_series("Speed Sensor 2", "Speed (RPM)", "green")
chart.add_series("Torque Sensor 1", "Torque (Nm)", "blue")
chart.add_series("Torque Sensor 2", "Torque (Nm)", "yellow")
# speed measurements coming in at 100Hz
timer_1 = QTimer()
timer_1.setInterval(10)
timer_1.timeout.connect(
lambda: chart.add_value(
"Speed Sensor 1", random.randint(200, 250), datetime.now().astimezone()
)
)
timer_1.timeout.connect(
lambda: chart.add_value(
"Speed Sensor 2", random.randint(250, 300), datetime.now().astimezone()
)
)
# torque measurements coming in at 10Hz
timer_2 = QTimer()
timer_2.setInterval(100)
timer_2.timeout.connect(
lambda: chart.add_value(
"Torque Sensor 1", random.randint(1, 5), datetime.now().astimezone()
)
)
timer_2.timeout.connect(
lambda: chart.add_value(
"Torque Sensor 2", random.randint(2, 4), datetime.now().astimezone()
)
)
timer_1.start()
timer_2.start()
widget.show()
sys.exit(app.exec())
I think it's good solution for what you need. I thought you could use timebase for X from collected data. Say 1 tick is equally distant from each other. Then you can calculate the dT and use it for your timing. But your solution bypass all that hustle and use timer as a source of time base. I will think of similar solution as a extension for LiveAxisRange. My original idea was to do this based on the real data, but I see that having some external timer might be beneficial :).
Hi there,
I have an application that plots sensor measurements in real-time, but the sensors do not all have the same sampling rate. For example, one sensor might send data at 100Hz, and another at 10Hz.
I would like to us pglive to plot this sensor data in real time within a single plot that shows a moving time "window". I.e the x-axis range is continuously updated to show the last 10 seconds or 30 seconds of data from all sensors.
My issue is that pglive's
DataConnector
andLiveAxisRange
functionality operate on the number of data points, not time.Is there a way I can set the x-axis to automatically set the range to show the last x-seconds of data?
Below is an example that plots "speed" data at 100Hz, and "torque" data at 10Hz. All
DataConnector
objects have been set tomax_point=100
, which means that the chart waits for the slowest data stream (the torque data) to fill up the buffer before the chart starts to roll.Any support would be greatly appreciated.