405 Stimmen

Spitzensignalerfassung in Echtzeit-Zeitreihendaten


Aktualisierung: Der bisher beste Algorithmus bisher ist dieser.


Diese Frage erkundet robuste Algorithmen zur Erkennung plötzlicher Peaks in echtzeitfähigen Zeitreihendaten.

Betrachten Sie das folgende Beispiel für Daten:

Plot der Daten

Beispiel dieser Daten ist im Matlab-Format (aber es handelt sich bei dieser Frage nicht um die Sprache, sondern um den Algorithmus):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

Es ist deutlich zu erkennen, dass es drei große Peaks und einige kleine Peaks gibt. Diese Datensatz ist ein spezifisches Beispiel aus der Klasse von Zeitreihendatensätzen, um die es in der Frage geht. Diese Datenklasse hat zwei allgemeine Eigenschaften:

  1. Es gibt grundlegendes Rauschen mit einem allgemeinen Mittelwert
  2. Es gibt große 'Peaks' oder 'höhere Datenpunkte', die signifikant vom Rauschen abweichen.

Angenommen wird auch folgendes:

  • Die Breite der Peaks kann nicht im Voraus bestimmt werden
  • Die Höhe der Peaks weicht signifikant von den anderen Werten ab
  • Der Algorithmus aktualisiert sich in Echtzeit (also mit jedem neuen Datenpunkt)

In einem solchen Szenario muss ein Grenzwert erstellt werden, der Signale auslöst. Allerdings kann der Grenzwert nicht statisch sein und muss in Echtzeit mithilfe eines Algorithmus bestimmt werden.


Meine Frage: Welcher Algorithmus ist gut geeignet, um solche Schwellenwerte in Echtzeit zu berechnen? Gibt es spezifische Algorithmen für solche Situationen? Was sind die bekanntesten Algorithmen?


Robuste Algorithmen oder nützliche Einsichten werden sehr geschätzt. (kann in jeder Sprache antworten: es geht um den Algorithmus)

3voto

leonardkraemer Punkte 6085

Ich brauchte so etwas in meinem Android-Projekt. Dachte, ich könnte eine Kotlin-Implementierung zurückgeben.

import org.apache.commons.math3.stat.descriptive.SummaryStatistics
/**
* Geschmeidig geglätteter Nullwert-Algorithmus schamlos kopiert von https://stackoverflow.com/a/22640362/6029703
* Verwendet einen gleitenden Mittelwert und eine gleitende Abweichung (getrennt), um Spitzen in einem Vektor zu identifizieren
*
* @param y - Der Eingabevektor zur Analyse
* @param lag - Die Verzögerung des gleitenden Fensters (d.h. wie groß das Fenster ist)
* @param schwellenwert - Der Z-Wert, bei dem der Algorithmus signalisiert (d.h. wie viele Standardabweichungen vom gleitenden Mittelwert eine Spitze (oder ein Signal) entfernt ist)
* @param einfluss - Der Einfluss (zwischen 0 und 1) neuer Signale auf Mittelwert und Standardabweichung (wie sehr eine Spitze (oder ein Signal) andere Werte in seiner Nähe beeinflussen soll)
* @return - Die berechneten Mittelwerte (avgFilter) und Abweichungen (stdFilter) und die Signale (signals)
*/
fun smoothedZScore(y: List, lag: Int, threshold: Double, influence: Double): Triple, List, List> {
    val stats = SummaryStatistics()
    // die Ergebnisse (Spitzen, 1 oder -1) unseres Algorithmus
    val signals = MutableList(y.size, { 0 })
    // Signale (Spitzen) aus unserer Originalliste filtern (unter Verwendung des Einfluss-Arguments)
    val filteredY = ArrayList(y)
    // der aktuelle Durchschnitt des gleitenden Fensters
    val avgFilter = MutableList(y.size, { 0.0 })
    // die aktuelle Standardabweichung des gleitenden Fensters
    val stdFilter = MutableList(y.size, { 0.0 })
    // avgFilter und stdFilter initialisieren
    y.take(lag).forEach { s -> stats.addValue(s) }
    avgFilter[lag - 1] = stats.mean
    stdFilter[lag - 1] = Math.sqrt(stats.populationVariance) // getStandardDeviation() verwendet Stichprobenvarianz (nicht das, was wir wollen)
    stats.clear()
    // Schleife Eingabe beginnend am Ende des gleitenden Fensters
    (lag..y.size - 1).forEach { i ->
        // wenn der Abstand zwischen dem aktuellen Wert und dem Durchschnitt genug Standardabweichungen (Schwellenwert) beträgt
        if (Math.abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]) {
            // dies ist ein Signal (d.h. Spitze), herausfinden, ob es ein positives oder negatives Signal ist
            signals[i] = if (y[i] > avgFilter[i - 1]) 1 else -1
            // dieses Signal mit Einfluss herausfiltern
            filteredY[i] = (influence * y[i]) + ((1 - influence) * filteredY[i - 1])
        } else {
            // sicherstellen, dass dieses Signal null bleibt
            signals[i] = 0
            // sicherstellen, dass dieser Wert nicht gefiltert wird
            filteredY[i] = y[i]
        }
        // gleitenden Durchschnitt und Abweichung aktualisieren
        (i - lag..i - 1).forEach { stats.addValue(filteredY[it]) }
        avgFilter[i] = stats.getMean()
        stdFilter[i] = Math.sqrt(stats.getPopulationVariance()) // getStandardDeviation() verwendet Stichprobenvarianz (nicht das, was wir wollen)
        stats.clear()
    }
    return Triple(signals, avgFilter, stdFilter)
}

Beispielpaket mit Verifizierungsdiagrammen finden Sie unter github.

Bildbeschreibung hier eingeben

3voto

Mike Roberts Punkte 531

Hier ist eine (nicht idiomatische) Scala-Version des Algorithmus für geglättete Z-Scores:

/**
  * Schamlos kopierter geglätteter Z-Score-Algorithmus von https://stackoverflow.com/a/22640362/6029703
  * Verwendet einen gleitenden Mittelwert und eine gleitende Standardabweichung (getrennt), um Spitzen in einem Vektor zu identifizieren
  *
  * @param y - Der Eingabevektor, der analysiert werden soll
  * @param lag - Die Verzögerung des gleitenden Fensters (d. h. wie groß das Fenster ist)
  * @param threshold - Der Z-Score, bei dem der Algorithmus Signale anzeigt (d. h. wie viele Standardabweichungen vom gleitenden Mittelwert eine Spitze (oder ein Signal) entfernt ist)
  * @param influence - Der Einfluss (zwischen 0 und 1) neuer Signale auf den Mittelwert und die Standardabweichung (wie sehr eine Spitze (oder ein Signal) andere Werte in ihrer Nähe beeinflussen sollte)
  * @return - Die berechneten Durchschnitte (avgFilter) und Abweichungen (stdFilter) sowie die Signale (signals)
  */
private def smoothedZScore(y: Seq[Double], lag: Int, threshold: Double, influence: Double): Seq[Int] = {
  val stats = new SummaryStatistics()

  // die Ergebnisse (Spitzen, 1 oder -1) unseres Algorithmus
  val signals = mutable.ArrayBuffer.fill(y.length)(0)

  // Filtern der Signale (Spitzen) aus unserer Originalliste (unter Verwendung der influence-Argumente)
  val filteredY = y.to[mutable.ArrayBuffer]

  // der aktuelle Durchschnitt des gleitenden Fensters
  val avgFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // die aktuelle Standardabweichung des gleitenden Fensters
  val stdFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // avgFilter und stdFilter initialisieren
  y.take(lag).foreach(s => stats.addValue(s))

  avgFilter(lag - 1) = stats.getMean
  stdFilter(lag - 1) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() verwendet Stichprobenvarianz (nicht das, was wir wollen)

  // Schleife, die bei Eingabe am Ende des gleitenden Fensters beginnt
  y.zipWithIndex.slice(lag, y.length - 1).foreach {
    case (s: Double, i: Int) =>
      // wenn der Abstand zwischen dem aktuellen Wert und dem Durchschnitt ausreichend Standardabweichungen (threshold) beträgt
      if (Math.abs(s - avgFilter(i - 1)) > threshold * stdFilter(i - 1)) {
        // dies ist ein Signal (dh Spitze), bestimmen Sie, ob es ein positives oder negatives Signal ist
        signals(i) = if (s > avgFilter(i - 1)) 1 else -1
        // das Signal mit influence filtern
        filteredY(i) = (influence * s) + ((1 - influence) * filteredY(i - 1))
      } else {
        // sicherstellen, dass dieses Signal null bleibt
        signals(i) = 0
        // sicherstellen, dass dieser Wert nicht gefiltert wird
        filteredY(i) = s
      }

      // gleitenden Durchschnitt und Standardabweichung aktualisieren
      stats.clear()
      filteredY.slice(i - lag, i).foreach(s => stats.addValue(s))
      avgFilter(i) = stats.getMean
      stdFilter(i) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() verwendet Stichprobenvarianz (nicht das, was wir wollen)
  }

  println(y.length)
  println(signals.length)
  println(signals)

  signals.zipWithIndex.foreach {
    case(x: Int, idx: Int) =>
      if (x == 1) {
        println(idx + " " + y(idx))
      }
  }

  val data =
    y.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "y", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "avgFilter", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s - threshold * stdFilter(i)), "name" -> "lower", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s + threshold * stdFilter(i)), "name" -> "upper", "row" -> "data") } ++
    signals.zipWithIndex.map { case (s: Int, i: Int) => Map("x" -> i, "y" -> s, "name" -> "signal", "row" -> "signal") }

  Vegas("Geglättetes Z")
    .withData(data)
    .mark(Line)
    .encodeX("x", Quant)
    .encodeY("y", Quant)
    .encodeColor(
      field="name",
      dataType=Nominal
    )
    .encodeRow("row", Ordinal)
    .show

  return signals
}

Hier ist ein Test, der dieselben Ergebnisse wie die Python- und Groovy-Versionen liefert:

val y = List(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
  1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
  1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
  1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d)

val lag = 30
val threshold = 5d
val influence = 0d

smoothedZScore(y, lag, threshold, influence)

Vegas-Diagramm des Ergebnisses

Gist hier

3voto

Hier ist eine Groovy (Java)-Implementierung des geglätteten Z-Score-Algorithmus (siehe oben stehende Antwort).

/**
 * "Geglätteter Null-Wert-Algorithmus" schamlos kopiert von https://stackoverflow.com/a/22640362/6029703
 * Verwendet einen rollenden Mittelwert und eine rollende Abweichung (getrennt), um Peaks in einem Vektor zu identifizieren
 *
 * @param y - Der Eingabevektor zur Analyse
 * @param lag - Die Verzögerung des bewegten Fensters (d.h. wie groß das Fenster ist)
 * @param threshold - Der Z-Score, bei dem der Algorithmus signalisiert (d.h. wie viele Standardabweichungen vom bewegten Mittelwert ein Peak (oder Signal) entfernt ist)
 * @param influence - Der Einfluss (zwischen 0 und 1) neuer Signale auf den Mittelwert und die Standardabweichung (wie stark ein Peak (oder Signal) andere Werte in der Nähe beeinflussen sollte)
 * @return - Die berechneten Durchschnitte (avgFilter) und Standardabweichungen (stdFilter) sowie die Signale (signals)
 */

public HashMap> thresholdingAlgo(List y, Long lag, Double threshold, Double influence) {
    //Initialisiere Stats-Instanz
    SummaryStatistics stats = new SummaryStatistics()

    //Die Ergebnisse (Peaks, 1 oder -1) unseres Algorithmus
    List signals = new ArrayList(Collections.nCopies(y.size(), 0))
    //Filtere die Signale (Peaks) aus unserer Originalliste (unter Verwendung des Influence-Arguments)
    List filteredY = new ArrayList(y)
    //Der aktuelle Durchschnitt des rollenden Fensters
    List avgFilter = new ArrayList(Collections.nCopies(y.size(), 0.0d))
    //Die aktuelle Standardabweichung des rollenden Fensters
    List stdFilter = new ArrayList(Collections.nCopies(y.size(), 0.0d))
    //Initialisiere avgFilter und stdFilter
    (0..lag-1).each { stats.addValue(y[it as int]) }
    avgFilter[lag - 1 as int] = stats.getMean()
    stdFilter[lag - 1 as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() verwendet Stichprobenvarianz (nicht das, was wir wollen)
    stats.clear()
    //Schleife über Eingabe beginnend am Ende des rollenden Fensters
    (lag..y.size()-1).each { i ->
        //Wenn der Abstand zwischen dem aktuellen Wert und dem Durchschnitt genug Standardabweichungen (Schwelle) entfernt ist
        if (Math.abs((y[i as int] - avgFilter[i - 1 as int]) as Double) > threshold * stdFilter[i - 1 as int]) {
            //Dies ist ein Signal (d.h. Peak), bestimme, ob es ein positives oder negatives Signal ist
            signals[i as int] = (y[i as int] > avgFilter[i - 1 as int]) ? 1 : -1
            //Filtere dieses Signal heraus unter Verwendung von Influence
            filteredY[i as int] = (influence * y[i as int]) + ((1-influence) * filteredY[i - 1 as int])
        } else {
            //Stelle sicher, dass dieses Signal weiterhin eine Null ist
            signals[i as int] = 0
            //Stelle sicher, dass dieser Wert nicht gefiltert wird
            filteredY[i as int] = y[i as int]
        }
        //Aktualisiere rollenden Durchschnitt und Abweichung
        (i - lag..i-1).each { stats.addValue(filteredY[it as int] as Double) }
        avgFilter[i as int] = stats.getMean()
        stdFilter[i as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() verwendet Stichprobenvarianz (nicht das, was wir wollen)
        stats.clear()
    }

    return [
        signals  : signals,
        avgFilter: avgFilter,
        stdFilter: stdFilter
    ]
}

Unten ist ein Test mit den gleichen Daten, der die gleichen Ergebnisse wie die obige Python / numpy-Implementierung liefert.

    // Daten
    def y = [1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
         1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
         1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
         1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d]

    // Einstellungen
    def lag = 30
    def threshold = 5
    def influence = 0

    def thresholdingResults = thresholdingAlgo((List) y, (Long) lag, (Double) threshold, (Double) influence)

    println y.size()
    println thresholdingResults.signals.size()
    println thresholdingResults.signals

    thresholdingResults.signals.eachWithIndex { x, idx ->
        if (x) {
            println y[idx]
        }
    }

2voto

Marc Compere Punkte 248

Diese Z-Score-Methode ist sehr effektiv bei der Spitzenentdeckung, was auch hilfreich für die Ausreißerentfernung ist. In Diskussionen über Ausreißer wird häufig der statistische Wert jedes Punktes und die Ethik der Datenänderung debattiert.

Aber im Falle von wiederholten fehlerhaften Sensorwerten aus fehleranfälligen seriellen Kommunikationen oder einem fehleranfälligen Sensor gibt es keinen statistischen Wert bei Fehlern oder falschen Messwerten. Diese müssen identifiziert und entfernt werden.

Visuell sind die Fehler offensichtlich. Die geraden Linien im Plot unten zeigen, was entfernt werden muss. Die Identifizierung und Entfernung von Fehlern mit einem Algorithmus ist jedoch sehr herausfordernd. Z-Scores funktionieren gut.

Die unten stehende Abbildung zeigt Werte, die über serielle Kommunikation von einem Sensor erhalten wurden. Gelegentliche serielle Kommunikationsfehler, Sensorfehler oder beides führen zu wiederholten, eindeutig fehlerhaften Datenpunkten.

Der Z-Score-Spitzenentdecker konnte auf falsche Datenpunkte hinweisen und einen sauberen resultierenden Datensatz generieren, während die Merkmale der korrekten Daten erhalten blieben:

Hier Bildbeschreibung eingeben

2voto

Sga Punkte 3550

Dart-Version des @Jean-Paul Smoothed Z Score Algorithmus:

class SmoothedZScore {
  int lag = 5;
  num threshold = 10;
  num influence = 0.5;

  num sum(List a) {
    num s = 0;
    for (int i = 0; i < a.length; i++) s += a[i];
    return s;
  }

  num mean(List a) {
    return sum(a) / a.length;
  }

  num stddev(List arr) {
    num arrMean = mean(arr);
    num dev = 0;
    for (int i = 0; i < arr.length; i++) dev += (arr[i] - arrMean) * (arr[i] - arrMean);
    return sqrt(dev / arr.length);
  }

  List smoothedZScore(List y) {
    if (y.length < lag + 2) {
      throw 'y-Datenarray zu kurz ($y.length) für den gegebenen Lag von $lag';
    }

    // Variablen initialisieren
    List signals = List.filled(y.length, 0);
    List filteredY = List.from(y);
    List leadIn = y.sublist(0, lag);

    var avgFilter = List.filled(y.length, 0);
    var stdFilter = List.filled(y.length, 0);
    avgFilter[lag - 1] = mean(leadIn);
    stdFilter[lag - 1] = stddev(leadIn);

    for (var i = lag; i < y.length; i++) {
      if ((y[i] - avgFilter[i - 1]).abs() > (threshold * stdFilter[i - 1])) {
        signals[i] = y[i] > avgFilter[i - 1] ? 1 : -1;
        // Einfluss reduzieren
        filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1];
      } else {
        signals[i] = 0; // kein Signal
        filteredY[i] = y[i];
      }

      // Filter anpassen
      List yLag = filteredY.sublist(i - lag, i);
      avgFilter[i] = mean(yLag);
      stdFilter[i] = stddev(yLag);
    }

    return signals;
  }
}

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X