405 Stimmen

Spitzensignalerfassung in Echtzeit-Zeitreihendaten


Aktualisierung: Der bisher beste Algorithmus bisher ist dieser.


Diese Frage erkundet robuste Algorithmen zur Erkennung plötzlicher Peaks in echtzeitfähigen Zeitreihendaten.

Betrachten Sie das folgende Beispiel für Daten:

Plot der Daten

Beispiel dieser Daten ist im Matlab-Format (aber es handelt sich bei dieser Frage nicht um die Sprache, sondern um den Algorithmus):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

Es ist deutlich zu erkennen, dass es drei große Peaks und einige kleine Peaks gibt. Diese Datensatz ist ein spezifisches Beispiel aus der Klasse von Zeitreihendatensätzen, um die es in der Frage geht. Diese Datenklasse hat zwei allgemeine Eigenschaften:

  1. Es gibt grundlegendes Rauschen mit einem allgemeinen Mittelwert
  2. Es gibt große 'Peaks' oder 'höhere Datenpunkte', die signifikant vom Rauschen abweichen.

Angenommen wird auch folgendes:

  • Die Breite der Peaks kann nicht im Voraus bestimmt werden
  • Die Höhe der Peaks weicht signifikant von den anderen Werten ab
  • Der Algorithmus aktualisiert sich in Echtzeit (also mit jedem neuen Datenpunkt)

In einem solchen Szenario muss ein Grenzwert erstellt werden, der Signale auslöst. Allerdings kann der Grenzwert nicht statisch sein und muss in Echtzeit mithilfe eines Algorithmus bestimmt werden.


Meine Frage: Welcher Algorithmus ist gut geeignet, um solche Schwellenwerte in Echtzeit zu berechnen? Gibt es spezifische Algorithmen für solche Situationen? Was sind die bekanntesten Algorithmen?


Robuste Algorithmen oder nützliche Einsichten werden sehr geschätzt. (kann in jeder Sprache antworten: es geht um den Algorithmus)

64voto

Roman Kiselev Punkte 1134

Hier ist die Python / numpy Implementierung des geglätteten z-Score-Algorithmus (siehe obige Antwort). Sie können den Gist hier finden.

#!/usr/bin/env python
# Implementierung des Algorithmus von https://stackoverflow.com/a/22640362/6029703
import numpy as np
import pylab

def thresholding_algo(y, lag, threshold, influence):
    signals = np.zeros(len(y))
    filteredY = np.array(y)
    avgFilter = [0]*len(y)
    stdFilter = [0]*len(y)
    avgFilter[lag - 1] = np.mean(y[0:lag])
    stdFilter[lag - 1] = np.std(y[0:lag])
    for i in range(lag, len(y)):
        if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
            if y[i] > avgFilter[i-1]:
                signals[i] = 1
            else:
                signals[i] = -1

            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])
        else:
            signals[i] = 0
            filteredY[i] = y[i]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])

    return dict(signals = np.asarray(signals),
                avgFilter = np.asarray(avgFilter),
                stdFilter = np.asarray(stdFilter))

Unten befindet sich der Test mit demselben Datensatz, der dasselbe Diagramm wie in der Originalantwort für R/Matlab ergibt

# Daten
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Einstellungen: lag = 30, Schwellenwert = 5, Einfluss = 0
lag = 30
threshold = 5
influence = 0

# Algorithmus mit den oben genannten Einstellungen ausführen
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)

# Ergebnis plotten
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"], color="cyan", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)

pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

31voto

aha Punkte 4184

Ein Ansatz besteht darin, Spitzen auf der folgenden Beobachtung zu erkennen:

  • Zeit t ist ein Peak, wenn (y(t) > y(t-1)) && (y(t) > y(t+1))

Es vermeidet falsche Positiven, indem es wartet, bis der Aufwärtstrend vorbei ist. Es ist nicht genau "Echtzeit" im Sinne, dass es den Höhepunkt um ein dt verpassen wird. Die Empfindlichkeit kann durch die Anforderung eines Vergleichsmargens kontrolliert werden. Es besteht ein Kompromiss zwischen der Rauscherkennung und der Zeitverzögerung der Erkennung. Sie können das Modell durch das Hinzufügen weiterer Parameter bereichern:

  • Peak, wenn (y(t) - y(t-dt) > m) && (y(t) - y(t+dt) > m)

wo dt und m Parameter sind, um Empfindlichkeit vs Zeitverzögerung zu kontrollieren

Dies ist das Ergebnis des erwähnten Algorithmus: Bildbeschreibung hier eingeben

Hier ist der Code, um das Diagramm in Python wiederzugeben:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9,
1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3.,
2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3.,
2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1.])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

Indem Sie m = 0.5 setzen, erhalten Sie ein saubereres Signal mit nur einem falschen Positiven: Bildbeschreibung hier eingeben

24voto

cklin Punkte 890

In der Signalverarbeitung wird die Spitzen-Detektion oft über die Wavelet-Transformation durchgeführt. Sie führen im Grunde genommen eine diskrete Wavelet-Transformation auf Ihren Zeitreihendaten durch. Nullstellen in den Detailkoeffizienten, die zurückgegeben werden, entsprechen Spitzen im Zeitreihensignal. Es werden unterschiedliche Spitzenamplituden erkannt, die auf verschiedenen Detailkoeffizientenebenen liegen, was Ihnen eine mehrstufige Auflösung bietet.

21voto

delica Punkte 1489

Python-Version, die mit Echtzeitströmen funktioniert (berechnet nicht alle Datenpunkte neu bei Ankunft jedes neuen Datenpunkts). Möglicherweise möchten Sie anpassen, was die Klassenfunktion zurückgibt - für meine Zwecke benötigte ich nur die Signale.

import numpy as np

class Echtzeit-Spitzenwert-Erkennung():
    def __init__(self, array, verzögerung, schwelle, einfluss):
        self.y = list(array)
        self.länge = len(self.y)
        self.verzögerung = verzögerung
        self.schwelle = schwelle
        self.einfluss = einfluss
        self.signale = [0] * len(self.y)
        self.gefiltertY = np.array(self.y).tolist()
        self.avgFilter = [0] * len(self.y)
        self.stdFilter = [0] * len(self.y)
        self.avgFilter[self.verzögerung - 1] = np.mean(self.y[0:self.verzögerung]).tolist()
        self.stdFilter[self.verzögerung - 1] = np.std(self.y[0:self.verzögerung]).tolist()

    def schwelle_algorithmus(self, neuer_wert):
        self.y.append(neuer_wert)
        i = len(self.y) - 1
        self.länge = len(self.y)
        if i < self.verzögerung:
            return 0
        elif i == self.verzögerung:
            self.signale = [0] * len(self.y)
            self.gefiltertY = np.array(self.y).tolist()
            self.avgFilter = [0] * len(self.y)
            self.stdFilter = [0] * len(self.y)
            self.avgFilter[self.verzögerung] = np.mean(self.y[0:self.verzögerung]).tolist()
            self.stdFilter[self.verzögerung] = np.std(self.y[0:self.verzögerung]).tolist()
            return 0

        self.signale += [0]
        self.gefiltertY += [0]
        self.avgFilter += [0]
        self.stdFilter += [0]

        if abs(self.y[i] - self.avgFilter[i - 1]) > (self.schwelle * self.stdFilter[i - 1]):

            if self.y[i] > self.avgFilter[i - 1]:
                self.signale[i] = 1
            else:
                self.signale[i] = -1

            self.gefiltertY[i] = self.einfluss * self.y[i] + \
                (1 - self.einfluss) * self.gefiltertY[i - 1]
            self.avgFilter[i] = np.mean(self.gefiltertY[(i - self.verzögerung):i])
            self.stdFilter[i] = np.std(self.gefiltertY[(i - self.verzögerung):i])
        else:
            self.signale[i] = 0
            self.gefiltertY[i] = self.y[i]
            self.avgFilter[i] = np.mean(self.gefiltertY[(i - self.verzögerung):i])
            self.stdFilter[i] = np.std(self.gefiltertY[(i - self.verzögerung):i])

        return self.signale[i]

15voto

S. Huber Punkte 883

In der Berechnungstopologie führt die Idee der persistenten Homologie zu einer effizienten Lösung - so schnell wie das Sortieren von Zahlen. Es erkennt nicht nur Spitzen, sondern quantifiziert auch die "Bedeutung" der Spitzen auf eine natürliche Weise, die es Ihnen ermöglicht, die für Sie bedeutenden Spitzen auszuwählen.

Algorithmuszusammenfassung. In einer 1-dimensionalen Umgebung (Zeitreihen, reellwertiges Signal) kann der Algorithmus leicht durch folgende Abbildung beschrieben werden:

Hartnäckigste Spitzen

Denken Sie an den Funktionsgraphen (oder sein Teilniveau-Set) als Landschaft und betrachten Sie einen abnehmenden Wasserspiegel, der bei unendlichem Niveau (oder 1,8 auf diesem Bild) beginnt. Während der Pegel sinkt, entstehen an lokalen Maxima Inseln. An lokalen Minima verschmelzen diese Inseln. Ein Detail in dieser Idee ist, dass die Insel, die später in der Zeit erschien, in die ältere Insel eingefügt wird. Die "Persistenz" einer Insel ist ihre Geburtszeit minus ihre Todeszeit. Die Längen der blauen Balken stellen die Persistenz dar, die die oben genannte "Bedeutung" eines Peaks ist.

Effizienz. Es ist nicht zu schwer, eine Implementierung zu finden, die in linearer Zeit läuft - tatsächlich ist es eine einzige, einfache Schleife - nachdem die Funktionswerte sortiert wurden. Daher sollte diese Implementierung in der Praxis schnell sein und ist auch einfach zu implementieren.

Referenzen. Eine Zusammenfassung der gesamten Geschichte und Verweise auf die Motivation aus der persistenten Homologie (ein Bereich der rechnerischen algebraischen Topologie) finden Sie hier: https://www.sthu.org/blog/13-perstopology-peakdetection/index.html

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X