Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make the X-axis time dynamically refresh by using pyqtgraph TimeAxisItem

I'm going to make a real-time curve out of a sequence of data. First, I established a quantity dictionary, which has 3 groups of data. The current program can draw a curve dynamically. The X-axis can also show the time, which is also updated in real time. However, the time at different points in the X-axis is always the same value.

UNIX_EPOCH_naive = datetime.datetime(1970, 1, 1, 0, 0) #offset-naive datetime
UNIX_EPOCH_offset_aware = datetime.datetime(1970, 1, 1, 0, 0, tzinfo = pytz.utc) #offset-aware datetime
UNIX_EPOCH = UNIX_EPOCH_naive

TS_MULT_us = 1e6

def now_timestamp(ts_mult=TS_MULT_us, epoch=UNIX_EPOCH):
    return(int((datetime.datetime.utcnow() - epoch).total_seconds()*ts_mult))

def int2dt(ts, ts_mult=TS_MULT_us):
    tz = pytz.timezone('Asia/Shanghai')
    user_ts = int(time.time())
    d1 = datetime.datetime.fromtimestamp(float(user_ts))
    d1x = tz.localize(d1)
    return(d1x)

def dt2int(dt, ts_mult=TS_MULT_us, epoch=UNIX_EPOCH):
    delta = dt - epoch
    return(int(delta.total_seconds()*ts_mult))

def td2int(td, ts_mult=TS_MULT_us):
    return(int(td.total_seconds()*ts_mult))

def int2td(ts, ts_mult=TS_MULT_us):
    return(datetime.timedelta(seconds=float(ts)/ts_mult))

class TimeAxisItem(pg.AxisItem):
    def __init__(self, *args, **kwargs):
        super(TimeAxisItem, self).__init__(*args, **kwargs)
    def tickStrings(self, values, scale, spacing):
        return [int2dt(value).strftime("%H:%M:%S") for value in values]

p = win.addPlot(title="Data-Time Graph", axisItems={'bottom': TimeAxisItem(orientation='bottom')})
data_dict = {}
p.addLegend() 
data_x=[]


def getDate():
    ......
    .....
    curve = p.plot(pen = color[len(data_dict)],name=name)
    data_dict[name] = [curve] # dictionary: {key:[curve,[dadta1,data2,...]]}
    data_dict[name].append([val])

def addToDisplay():
    p.plot()
    for i in data_dict.items():
        data = i[1][1] #
        curve = i[1][0] #
        if(len(data) > data_frequency):#
             data_y=data[- data_frequency:]
        else:
             data_y = data[:]
        curve.setData(data_y)#

if __name__ == "__main__":
    th= threading.Thread(target=getDate)# 
    th.start()
    timer = pg.QtCore.QTimer()
    timer.timeout.connect(addToDisplay) 
    timer.start(10)

What I hope is that the X-axis is dynamically refreshed, with the latest time on the right side and the past time is on the left side.

like image 637
autogyro Avatar asked Apr 28 '19 00:04

autogyro


1 Answers

enter image description here

I'm not entirely sure what you're trying to achieve since your code doesn't run but it seems you're trying to create a timestamp plot. Here's a widget that uses TimeAxisItem to keep track of elapsed time on the X-axis.

PyQt5

from PyQt5 import QtCore, QtGui, QtWidgets
from threading import Thread
from collections import deque
import pyqtgraph as pg
import numpy as np
import random
import sys
import time

"""Scrolling Timestamp Plot Widget Example"""

class TimeAxisItem(pg.AxisItem):
    """Internal timestamp for x-axis"""
    def __init__(self, *args, **kwargs):
        super(TimeAxisItem, self).__init__(*args, **kwargs)

    def tickStrings(self, values, scale, spacing):
        """Function overloading the weak default version to provide timestamp"""

        return [QtCore.QTime().currentTime().addMSecs(value).toString('mm:ss') for value in values]

class ScrollingTimestampPlot(QtGui.QWidget):
    """Scrolling plot widget with timestamp on x-axis and dynamic y-axis"""

    def __init__(self, parent=None):
        super(ScrollingTimestampPlot, self).__init__(parent)

        # Internal timestamp for x-axis
        self.timestamp = QtCore.QTime()
        self.timestamp.start()

        # Desired Frequency (Hz) = 1 / self.FREQUENCY
        # USE FOR TIME.SLEEP (s)
        self.FREQUENCY = 0.025

        # Screen refresh rate to update plot (ms)
        # self.SCROLLING_TIMESTAMP_PLOT_REFRESH_RATE = 1 / Desired Frequency (Hz) * 1000
        # USE FOR TIMER.TIMER (ms)
        self.SCROLLING_TIMESTAMP_PLOT_REFRESH_RATE = self.FREQUENCY * 1000

        self.DATA_POINTS_TO_DISPLAY = 200

        # Automatically pops from left if length is full
        self.data = deque(maxlen=self.DATA_POINTS_TO_DISPLAY)

        # Create Plot Widget 
        self.scrolling_timestamp_plot_widget = pg.PlotWidget(axisItems={'bottom': TimeAxisItem(orientation='bottom')})

        # Enable/disable plot squeeze (Fixed axis movement)
        self.scrolling_timestamp_plot_widget.plotItem.setMouseEnabled(x=False, y=False)
        self.scrolling_timestamp_plot_widget.setTitle('Scrolling Timestamp Plot Example')
        self.scrolling_timestamp_plot_widget.setLabel('left', 'Value')
        self.scrolling_timestamp_plot_widget.setLabel('bottom', 'Time (s)')

        self.scrolling_timestamp_plot = self.scrolling_timestamp_plot_widget.plot()
        self.scrolling_timestamp_plot.setPen(246,212,255)

        self.layout = QtGui.QGridLayout()
        self.layout.addWidget(self.scrolling_timestamp_plot_widget)

        self.read_position_thread()
        self.start()

    def start(self):
        """Update plot"""

        self.position_update_timer = QtCore.QTimer()
        self.position_update_timer.timeout.connect(self.plot_updater)
        self.position_update_timer.start(self.get_scrolling_timestamp_plot_refresh_rate())

    def read_position_thread(self):
        """Read in data using a thread"""

        self.current_position_value = 0
        self.position_update_thread = Thread(target=self.read_position, args=())
        self.position_update_thread.daemon = True
        self.position_update_thread.start()

    def read_position(self):
        frequency = self.get_scrolling_timestamp_plot_frequency()
        while True:
            self.current_position_value = random.randint(1,101) 
            time.sleep(frequency)

    def plot_updater(self):
        self.data_point = float(self.current_position_value)

        self.data.append({'x': self.timestamp.elapsed(), 'y': self.data_point})
        self.scrolling_timestamp_plot.setData(x=[item['x'] for item in self.data], y=[item['y'] for item in self.data])

    def clear_scrolling_timestamp_plot(self):
        self.data.clear()

    def get_scrolling_timestamp_plot_frequency(self):
        return self.FREQUENCY

    def get_scrolling_timestamp_plot_refresh_rate(self):
        return self.SCROLLING_TIMESTAMP_PLOT_REFRESH_RATE

    def get_scrolling_timestamp_plot_layout(self):
        return self.layout

    def get_current_position_value(self):
        return self.current_position_value

    def get_scrolling_timestamp_plot_widget(self):
        return self.scrolling_timestamp_plot_widget

# Start Qt event loop unless running in interactive mode or using pyside
if __name__ == '__main__':
    # Create main application window
    app = QtWidgets.QApplication([])
    app.setStyle(QtGui.QStyleFactory.create("Cleanlooks"))
    mw = QtGui.QMainWindow()
    mw.setWindowTitle('Scrolling Plot Example')

    # Create scrolling plot
    scrolling_timestamp_plot_widget = ScrollingTimestampPlot()

    # Create and set widget layout
    # Main widget container
    cw = QtGui.QWidget()
    ml = QtGui.QGridLayout()
    cw.setLayout(ml)
    mw.setCentralWidget(cw)

    # Can use either to add plot to main layout
    #ml.addWidget(scrolling_timestamp_plot_widget.get_scrolling_timestamp_plot_widget(),0,0)
    ml.addLayout(scrolling_timestamp_plot_widget.get_scrolling_timestamp_plot_layout(),0,0)
    mw.show()

    if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
        QtGui.QApplication.instance().exec_()

PyQt4

from PyQt4 import QtCore, QtGui
from threading import Thread
from collections import deque
import pyqtgraph as pg
import numpy as np
import random
import sys
import time

"""Scrolling Timestamp Plot Widget Example"""

class TimeAxisItem(pg.AxisItem):
    """Internal timestamp for x-axis"""
    def __init__(self, *args, **kwargs):
        super(TimeAxisItem, self).__init__(*args, **kwargs)

    def tickStrings(self, values, scale, spacing):
        """Function overloading the weak default version to provide timestamp"""

        return [QtCore.QTime().addMSecs(value).toString('mm:ss') for value in values]

class ScrollingTimestampPlot(QtGui.QWidget):
    """Scrolling plot widget with timestamp on x-axis and dynamic y-axis"""

    def __init__(self, parent=None):
        super(ScrollingTimestampPlot, self).__init__(parent)

        # Internal timestamp for x-axis
        self.timestamp = QtCore.QTime()
        self.timestamp.start()

        # Desired Frequency (Hz) = 1 / self.FREQUENCY
        # USE FOR TIME.SLEEP (s)
        self.FREQUENCY = 0.025

        # Screen refresh rate to update plot (ms)
        # self.SCROLLING_TIMESTAMP_PLOT_REFRESH_RATE = 1 / Desired Frequency (Hz) * 1000
        # USE FOR TIMER.TIMER (ms)
        self.SCROLLING_TIMESTAMP_PLOT_REFRESH_RATE = self.FREQUENCY * 1000

        self.DATA_POINTS_TO_DISPLAY = 200

        # Automatically pops from left if length is full
        self.data = deque(maxlen=self.DATA_POINTS_TO_DISPLAY)

        # Create Plot Widget 
        self.scrolling_timestamp_plot_widget = pg.PlotWidget(axisItems={'bottom': TimeAxisItem(orientation='bottom')})

        # Enable/disable plot squeeze (Fixed axis movement)
        self.scrolling_timestamp_plot_widget.plotItem.setMouseEnabled(x=False, y=False)
        self.scrolling_timestamp_plot_widget.setTitle('Scrolling Timestamp Plot Example')
        self.scrolling_timestamp_plot_widget.setLabel('left', 'Value')
        self.scrolling_timestamp_plot_widget.setLabel('bottom', 'Time (s)')

        self.scrolling_timestamp_plot = self.scrolling_timestamp_plot_widget.plot()
        self.scrolling_timestamp_plot.setPen(246,212,255)

        self.layout = QtGui.QGridLayout()
        self.layout.addWidget(self.scrolling_timestamp_plot_widget)

        self.read_position_thread()
        self.start()

    def start(self):
        """Update plot"""

        self.position_update_timer = QtCore.QTimer()
        self.position_update_timer.timeout.connect(self.plot_updater)
        self.position_update_timer.start(self.get_scrolling_timestamp_plot_refresh_rate())

    def read_position_thread(self):
        """Read in data using a thread"""

        self.current_position_value = 0
        self.position_update_thread = Thread(target=self.read_position, args=())
        self.position_update_thread.daemon = True
        self.position_update_thread.start()

    def read_position(self):
        frequency = self.get_scrolling_timestamp_plot_frequency()
        while True:
            self.current_position_value = random.randint(1,101) 
            time.sleep(frequency)

    def plot_updater(self):
        self.data_point = float(self.current_position_value)

        self.data.append({'x': self.timestamp.elapsed(), 'y': self.data_point})
        self.scrolling_timestamp_plot.setData(x=[item['x'] for item in self.data], y=[item['y'] for item in self.data])

    def clear_scrolling_timestamp_plot(self):
        self.data.clear()

    def get_scrolling_timestamp_plot_frequency(self):
        return self.FREQUENCY

    def get_scrolling_timestamp_plot_refresh_rate(self):
        return self.SCROLLING_TIMESTAMP_PLOT_REFRESH_RATE

    def get_scrolling_timestamp_plot_layout(self):
        return self.layout

    def get_current_position_value(self):
        return self.current_position_value

    def get_scrolling_timestamp_plot_widget(self):
        return self.scrolling_timestamp_plot_widget

# Start Qt event loop unless running in interactive mode or using pyside
if __name__ == '__main__':
    # Create main application window
    app = QtGui.QApplication([])
    app.setStyle(QtGui.QStyleFactory.create("Cleanlooks"))
    mw = QtGui.QMainWindow()
    mw.setWindowTitle('Scrolling Plot Example')

    # Create scrolling plot
    scrolling_timestamp_plot_widget = ScrollingTimestampPlot()

    # Create and set widget layout
    # Main widget container
    cw = QtGui.QWidget()
    ml = QtGui.QGridLayout()
    cw.setLayout(ml)
    mw.setCentralWidget(cw)

    # Can use either to add plot to main layout
    #ml.addWidget(scrolling_timestamp_plot_widget.get_scrolling_timestamp_plot_widget(),0,0)
    ml.addLayout(scrolling_timestamp_plot_widget.get_scrolling_timestamp_plot_layout(),0,0)
    mw.show()

    if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
        QtGui.QApplication.instance().exec_()
like image 157
nathancy Avatar answered Nov 14 '22 16:11

nathancy