405 Stimmen

Spitzensignalerfassung in Echtzeit-Zeitreihendaten


Aktualisierung: Der bisher beste Algorithmus bisher ist dieser.


Diese Frage erkundet robuste Algorithmen zur Erkennung plötzlicher Peaks in echtzeitfähigen Zeitreihendaten.

Betrachten Sie das folgende Beispiel für Daten:

Plot der Daten

Beispiel dieser Daten ist im Matlab-Format (aber es handelt sich bei dieser Frage nicht um die Sprache, sondern um den Algorithmus):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

Es ist deutlich zu erkennen, dass es drei große Peaks und einige kleine Peaks gibt. Diese Datensatz ist ein spezifisches Beispiel aus der Klasse von Zeitreihendatensätzen, um die es in der Frage geht. Diese Datenklasse hat zwei allgemeine Eigenschaften:

  1. Es gibt grundlegendes Rauschen mit einem allgemeinen Mittelwert
  2. Es gibt große 'Peaks' oder 'höhere Datenpunkte', die signifikant vom Rauschen abweichen.

Angenommen wird auch folgendes:

  • Die Breite der Peaks kann nicht im Voraus bestimmt werden
  • Die Höhe der Peaks weicht signifikant von den anderen Werten ab
  • Der Algorithmus aktualisiert sich in Echtzeit (also mit jedem neuen Datenpunkt)

In einem solchen Szenario muss ein Grenzwert erstellt werden, der Signale auslöst. Allerdings kann der Grenzwert nicht statisch sein und muss in Echtzeit mithilfe eines Algorithmus bestimmt werden.


Meine Frage: Welcher Algorithmus ist gut geeignet, um solche Schwellenwerte in Echtzeit zu berechnen? Gibt es spezifische Algorithmen für solche Situationen? Was sind die bekanntesten Algorithmen?


Robuste Algorithmen oder nützliche Einsichten werden sehr geschätzt. (kann in jeder Sprache antworten: es geht um den Algorithmus)

2voto

Micah Parks Punkte 1674

Ich habe ein Go-Paket für die beliebteste Antwort von Jean-Paul geschrieben. Es setzt voraus, dass die y-Werte vom Typ float64 sind.

github.com/MicahParks/peakdetect

Das untenstehende Beispiel verwendet dieses Paket und basiert auf dem R-Beispiel aus der oben genannten beliebten Antwort. Es kompiliert ohne Abhängigkeiten, versucht einen geringen Speicherplatzverbrauch zu halten und bearbeitet vergangene Punkte nicht erneut, wenn ein neuer Datenpunkt eintrifft. Das Projekt hat eine Testabdeckung von 100%, hauptsächlich aus den Eingaben und Ausgaben des genannten R-Beispiels. Aber, falls jemand Fehler entdeckt, bitte ein GitHub-Problem melden.

Bearbeitung: Ich habe eine Leistungsverbesserung mit v0.0.5 vorgenommen, die offenbar um den Faktor 10 schneller ist! Es verwendet die Welford-Methode zur Initialisierung und eine ähnliche Methode zur Berechnung des Mittelwerts und der Populationsstandardabweichung über den Verzögerungszeitraum (gleitendes Fenster). Besonderer Dank gilt dieser Antwort in einem anderen Beitrag: https://stackoverflow.com/a/14638138/14797322

Hier ist das Golang-Beispiel, das auf dem R-Beispiel basiert:

package main

import (
    "fmt"
    "log"

    "github.com/MicahParks/peakdetect"
)

// Dieses Beispiel entspricht dem R-Beispiel des Autors des Algorithmus.
// https://stackoverflow.com/a/54507329/14797322
func main() {
    data := []float64{1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1}

    // Algorithmuskonfiguration aus dem Beispiel.
    const (
        lag       = 30
        threshold = 5
        influence = 0
    )

    // Peak-Detektor erstellen und initialisieren.
    detector := peakdetect.NewPeakDetector()
    err := detector.Initialize(influence, threshold, data[:lag]) // Die Länge der Anfangswerte entspricht dem Verzögerungszeitraum.
    if err != nil {
        log.Fatalf("Fehler beim Initialisieren des Peak-Detektors.\nFehler: %s", err)
    }

    // Neue Datenpunkte verarbeiten und das erzeugte Signal bestimmen.
    //
    // Diese Methode, .Next(), ist am besten, wenn Daten im Stream verarbeitet werden, aber hier wird einfach über einen Slice iteriert.
    nextDataPoints := data[lag:]
    for i, newPoint := range nextDataPoints {
        signal := detector.Next(newPoint)
        var signalType string
        switch signal {
        case peakdetect.SignalNegative:
            signalType = "negativ"
        case peakdetect.SignalNeutral:
            signalType = "neutral"
        case peakdetect.SignalPositive:
            signalType = "positiv"
        }

        println(fmt.Sprintf("Datenpunkt an Index %d hat das Signal: %s", i+lag, signalType))
    }

    // Diese Methode, .NextBatch(), ist eine Hilfsfunktion zum Verarbeiten vieler Datenpunkte auf einmal. Der zurückgegebene Slice
    // sollte die gleichen Signalergebnisse wie die obige Schleife erzeugen.
    signals := detector.NextBatch(nextDataPoints)
    println(fmt.Sprintf"1:1 Verhältnis von Batch-Eingaben zu Signal-Ausgaben: %t", len(signals) == len(nextDataPoints)))
}

1voto

Spandyie Punkte 834

Objektorientierte Version des z-Score-Algorithmus mit modernem C++

template
class FindPeaks{
private:
    std::vector m_input_signal;                      // speichert Eingabevektor
    std::vector m_array_peak_positive;               
    std::vector m_array_peak_negative;               

public:
    FindPeaks(const std::vector& t_input_signal): m_input_signal{t_input_signal}{ }

    void estimate(){
        int lag{5};
        T threshold{ 5 };                                                                                       // setze einen Schwellenwert
        T influence{ 0.5 };                                                                                    // Wert zwischen 0 und 1, 1 ist normale Einfluss und 0,5 ist die Hälfte des Einflusses

        std::vector filtered_signal(m_input_signal.size(), 0.0);                                             // zur Glättung des Signals, mit Nullen initialisieren
        std::vector signal(m_input_signal.size(), 0);                                                          // Vektor, der angibt, ob das Signal negativ oder positiv ist
        std::vector avg_filtered(m_input_signal.size(), 0.0);                                                // gleitende Durchschnitte
        std::vector std_filtered(m_input_signal.size(), 0.0);                                                // gleitende Standardabweichung

        avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag);                         // Iterator an Vektor übergeben
        std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);

        for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) {                                         // Startindex
            if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) {     // überprüfen, ob Wert über dem Schwellenwert liegt             
                if ((m_input_signal[iLag]) > avg_filtered[iLag - 1) {
                    signal[iLag] = 1;                                                                               // positives Signal zuweisen
                }
                else {
                    signal[iLag] = -1;                                                                                  // negatives Signal zuweisen
                }
                filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1];        // exponentielle Glättung
            }
            else {
                signal[iLag] = 0;                                                                                         // kein Signal
                filtered_signal[iLag] = m_input_signal[iLag];
            }

            avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
            std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);

        }

        for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
            if (signal[iSignal] == 1) {
                m_array_peak_positive.emplace_back(m_input_signal[iSignal]);                                        // positive Peaks speichern
            }
            else if (signal[iSignal] == -1) {
                m_array_peak_negative.emplace_back(m_input_signal[iSignal]);                                         // negative Peaks speichern
            }
        }
        printVoltagePeaks(signal, m_input_signal);

    }

    std::pair< std::vector, std::vector > get_peaks()
    {
        return std::make_pair(m_array_peak_positive, m_array_peak_negative);
    }

};

template
void printVoltagePeaks(std::vector& m_signal, std::vector& m_input_signal) {
    std::ofstream output_file("./voltage_peak.csv");
    std::ostream_iterator output_iterator_voltage(output_file, ",");
    std::ostream_iterator output_iterator_signal(output_file, ",");
    std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
    output_file << "\n";
    std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);
}

template
typename std::iterator_traits::value_type findMean(iterator_type it, iterator_type end)
{
    /* Funktion erhält Iterator */
    typename std::iterator_traits::value_type sum{ 0.0 };
    int counter = 0;
    while (it != end) {
        sum += *(it++);
        counter++;
    }
    return sum / counter;
}

template
typename std::iterator_traits::value_type findStandardDeviation(iterator_type it, iterator_type end)
{
    auto mean = findMean(it, end);
    typename std::iterator_traits::value_type sum_squared_error{ 0.0 };
    int counter{ 0 };
    while (it != end) {
        sum_squared_error += std::pow((*(it++) - mean), 2);
        counter++;
    }
    auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
    return standard_deviation;
}

1voto

Alen Punkte 49

Perl-Implementierung des Algorithmus von @Jean-Paul.

#!/usr/bin/perl

use strict;
use Data::Dumper;

sub mittelwert {
    my $daten = shift;
    my $summe = 0;
    my $mittelwert = 0;
    for my $element (@$daten) {
        $summe += $element;
    }
    $mittelwert = $summe / (scalar @$daten) if @$daten;
    return $mittelwert;
}

sub varianz {
    my $daten = shift;
    my $varianz = 0;
    my $mittelwert = mittelwert($daten);
    my $summe = 0;
    for my $element (@$daten) {
        $summe += ($element - $mittelwert)**2;
    }
    $varianz = $summe / (scalar @$daten) if @$daten;
    return $varianz;
}

sub std {
    my $daten = shift;
    my $varianz = varianz($daten);
    return sqrt($varianz);
}

# @param y - Der Eingabevektor zur Analyse
# @param lag - Die Verzögerung des gleitenden Fensters
# @param threshold - Der Z-Wert, bei dem der Algorithmus ein Signal gibt
# @param influence - Der Einfluss (zwischen 0 und 1) neuer Signale auf den Mittelwert und die Standardabweichung
sub thresholding_algo {
    my ($y, $lag, $threshold, $influence) = @_;

    my @signale = (0) x @$y;
    my @gefilterteY = @$y;
    my @avgFilter = (0) x @$y;
    my @stdFilter = (0) x @$y;

    $avgFilter[$lag - 1] = mittelwert([@$y[0..$lag-1]]);
    $stdFilter[$lag - 1] = std([@$y[0..$lag-1]]);

    for (my $i=$lag; $i <= @$y - 1; $i++) {
        if (abs($y->[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$i-1]) {
            if ($y->[$i] > $avgFilter[$i-1]) {
                $signale[$i] = 1;
            } else {
                $signale[$i] = -1;
            }

            $gefilterteY[$i] = $influence * $y->[$i] + (1 - $influence) * $gefilterteY[$i-1];
            $avgFilter[$i] = mittelwert([@gefilterteY[($i-$lag)..($i-1)]);
            $stdFilter[$i] = std([@gefilterteY[($i-$lag)..($i-1)]);
        }
        else {
            $signale[$i] = 0;
            $gefilterteY[$i] = $y->[$i];
            $avgFilter[$i] = mittelwert([@gefilterteY[($i-$lag)..($i-1)]);
            $stdFilter[$i] = std([@gefilterteY[($i-$lag)..($i-1)]);
        }
    }

    return {
        signals => \@signale,
        avgFilter => \@avgFilter,
        stdFilter => \@stdFilter
    };
}

my $y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1];

my $lag = 30;
my $threshold = 5;
my $influence = 0;

my $ergebnis = thresholding_algo($y, $lag, $threshold, $influence);

print Dumper $ergebnis;

1voto

MatteoB Punkte 53

Dies ist eine Python-Implementierung des Robust peak detection algorithm Algorithmus.

Die Initialisierung und der Berechnungsteil sind aufgeteilt, nur das filtered_y Array wurde beibehalten, das eine maximale Größe von lag hat, damit keine Speichervergrößerungen auftreten. (Ergebnisse sind dieselben wie bei den obigen Antworten). Um den Graphen zu plotten, wird auch das labels Array beibehalten.

Ich habe einen GitHub-Gist erstellt.

import numpy as np
import pylab

def init(x, lag, threshold, influence):
    '''
    Glättungsalgorithmus für Z-Scores
    Implementierung des Algorithmus von https://stackoverflow.com/a/22640362/6029703
    '''

    labels = np.zeros(lag)
    filtered_y = np.array(x[0:lag])
    avg_filter = np.zeros(lag)
    std_filter = np.zeros(lag)
    var_filter = np.zeros(lag)

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])

    return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
                std=std_filter[lag - 1], filtered_y=filtered_y,
                labels=labels)

def add(result, single_value, lag, threshold, influence):
    previous_avg = result['avg']
    previous_var = result['var']
    previous_std = result['std']
    filtered_y = result['filtered_y']
    labels = result['labels']

    if abs(single_value - previous_avg) > threshold * previous_std:
        if single_value > previous_avg:
            labels = np.append(labels, 1)
        else:
            labels = np.append(labels, -1)

        # Neues gefiltertes Element ermitteln, das den Einflussfaktor verwendet
        filtered_y = np.append(filtered_y, influence * single_value
                               + (1 - influence) * filtered_y[-1])
    else:
        labels = np.append(labels, 0)
        filtered_y = np.append(filtered_y, single_value)

    # Durchschnitt aktualisieren durch Addition des vorherigen Durchschnitts + lag * (das neu berechnete Element - berechnetes Element an Position (i - lag))
    current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
            - filtered_y[len(filtered_y) - lag - 1])

    # Varianz aktualisieren als vorherige Varianz + 1 / lag * neues berechnetes Element - der vorherige Durchschnitt -
    current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
            - previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
            - lag] - previous_avg) ** 2 - (filtered_y[-1]
            - filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag)  # das neu berechnete Element an Position (lag) - Durchschnitt des vorherigen - neues berechnetes Element - berechnetes Element an Position lag ....

    # Standardabweichung für das aktuelle Element berechnen als sqrt (aktuelle Varianz)
    current_std_filter = np.sqrt(current_var_filter)

    return dict(avg=current_avg_filter, var=current_var_filter,
                std=current_std_filter, filtered_y=filtered_y[1:],
                labels=labels)

lag = 30
threshold = 5
influence = 0

y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Algorithmus mit den oben genannten Einstellungen ausführen
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)

i = open('quartz2', 'r')
for i in y[lag:]:
    result = add(result, i, lag, threshold, influence)

# Ergebnis plotten
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
           lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

1voto

nichole Punkte 111

Statt den Mittelwert mit dem Maximum zu vergleichen, kann man auch das Maximum mit benachbarten Minima vergleichen, wobei die Minima nur über einem Rauschschwellenwert definiert sind. Wenn das lokale Maximum > 3 Mal (oder einem anderen Vertrauensfaktor) die benachbarten Minima ist, dann handelt es sich um ein Maximum. Die Spitzenbestimmung ist genauer mit breiteren beweglichen Fenstern. Das oben Genannte verwendet eine Berechnung, die auf der Mitte des Fensters zentriert ist, übrigens, anstatt einer Berechnung am Ende des Fensters (== Verzögerung).

Beachten Sie, dass ein Maximum als Signalanstieg vorher und einen Abfall danach betrachtet werden muss.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X