更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

一种方法是根据以下观察来检测峰:

时间t是一个峰值(y (t) > y (t - 1)) & & ((t) > y (t + 1))

它通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

峰如果y (y (t) - (t-dt) > m) && (y (t) - y (t + dt) > m)

dt和m是控制灵敏度和延时的参数

这是你用上述算法得到的结果:

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置m = 0.5,你可以得到一个更清晰的信号,只有一个假阳性:

其他回答

下面是ZSCORE算法的PHP实现:

<?php
$y = array(1,7,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,10,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1);

function mean($data, $start, $len) {
    $avg = 0;
    for ($i = $start; $i < $start+ $len; $i ++)
        $avg += $data[$i];
    return $avg / $len;
}
    
function stddev($data, $start,$len) {
    $mean = mean($data,$start,$len);
    $dev = 0;
    for ($i = $start; $i < $start+$len; $i++) 
        $dev += (($data[$i] - $mean) * ($data[$i] - $mean));
    return sqrt($dev / $len);
}

function zscore($data, $len, $lag= 20, $threshold = 1, $influence = 1) {

    $signals = array();
    $avgFilter = array();
    $stdFilter = array();
    $filteredY = array();
    $avgFilter[$lag - 1] = mean($data, 0, $lag);
    $stdFilter[$lag - 1] = stddev($data, 0, $lag);
    
    for ($i = 0; $i < $len; $i++) {
        $filteredY[$i] = $data[$i];
        $signals[$i] = 0;
    }


    for ($i=$lag; $i < $len; $i++) {
        if (abs($data[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$lag - 1]) {
            if ($data[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            }
            else {
                $signals[$i] = -1;
            }
            $filteredY[$i] = $influence * $data[$i] + (1 - $influence) * $filteredY[$i-1];
        } 
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $data[$i];
        }
        
        $avgFilter[$i] = mean($filteredY, $i - $lag, $lag);
        $stdFilter[$i] = stddev($filteredY, $i - $lag, $lag);
    }
    return $signals;
}

$sig = zscore($y, count($y));

print_r($y); echo "<br><br>";
print_r($sig); echo "<br><br>";

for ($i = 0; $i < count($y); $i++) echo $i. " " . $y[$i]. " ". $sig[$i]."<br>";

不需要将极大值与平均值进行比较,还可以将极大值与相邻的最小值进行比较,其中最小值仅定义在噪声阈值之上。 如果局部最大值是>的3倍(或其他置信因子)相邻的最小值,那么这个最大值就是一个峰值。 移动窗口越宽,峰值的确定越准确。 上面使用了以窗口中间为中心的计算, 顺便说一下,而不是在窗口结束时计算(== lag)。

请注意,最大值必须被视为信号之前的增加 之后下降。

下面是我尝试为“Smoothed z-score算法”创建一个Ruby解决方案:

module ThresholdingAlgoMixin
  def mean(array)
    array.reduce(&:+) / array.size.to_f
  end

  def stddev(array)
    array_mean = mean(array)
    Math.sqrt(array.reduce(0.0) { |a, b| a.to_f + ((b.to_f - array_mean) ** 2) } / array.size.to_f)
  end

  def thresholding_algo(lag: 5, threshold: 3.5, influence: 0.5)
    return nil if size < lag * 2
    Array.new(size, 0).tap do |signals|
      filtered = Array.new(self)

      initial_slice = take(lag)
      avg_filter = Array.new(lag - 1, 0.0) + [mean(initial_slice)]
      std_filter = Array.new(lag - 1, 0.0) + [stddev(initial_slice)]
      (lag..size-1).each do |idx|
        prev = idx - 1
        if (fetch(idx) - avg_filter[prev]).abs > threshold * std_filter[prev]
          signals[idx] = fetch(idx) > avg_filter[prev] ? 1 : -1
          filtered[idx] = (influence * fetch(idx)) + ((1-influence) * filtered[prev])
        end

        filtered_slice = filtered[idx-lag..prev]
        avg_filter[idx] = mean(filtered_slice)
        std_filter[idx] = stddev(filtered_slice)
      end
    end
  end
end

以及示例用法:

test_data = [
  1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1,
  1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1,
  1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5,
  1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2,
  1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1
].extend(ThresholdingAlgoMixin)

puts test_data.thresholding_algo.inspect

# Output: [
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
#   1, 1, 0, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0
# ]

我允许自己创建一个javascript版本。也许会有帮助。javascript应该是上面给出的伪代码的直接转录。可用的npm包和github repo:

https://github.com/crux/smoothed-z-score @joe_six / smoothed-z-score-peak-signal-detection

Javascript的翻译:

// javascript port of: https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/48895639#48895639

function sum(a) {
    return a.reduce((acc, val) => acc + val)
}

function mean(a) {
    return sum(a) / a.length
}

function stddev(arr) {
    const arr_mean = mean(arr)
    const r = function(acc, val) {
        return acc + ((val - arr_mean) * (val - arr_mean))
    }
    return Math.sqrt(arr.reduce(r, 0.0) / arr.length)
}

function smoothed_z_score(y, params) {
    var p = params || {}
    // init cooefficients
    const lag = p.lag || 5
    const threshold = p.threshold || 3.5
    const influence = p.influece || 0.5

    if (y === undefined || y.length < lag + 2) {
        throw ` ## y data array to short(${y.length}) for given lag of ${lag}`
    }
    //console.log(`lag, threshold, influence: ${lag}, ${threshold}, ${influence}`)

    // init variables
    var signals = Array(y.length).fill(0)
    var filteredY = y.slice(0)
    const lead_in = y.slice(0, lag)
    //console.log("1: " + lead_in.toString())

    var avgFilter = []
    avgFilter[lag - 1] = mean(lead_in)
    var stdFilter = []
    stdFilter[lag - 1] = stddev(lead_in)
    //console.log("2: " + stdFilter.toString())

    for (var i = lag; i < y.length; i++) {
        //console.log(`${y[i]}, ${avgFilter[i-1]}, ${threshold}, ${stdFilter[i-1]}`)
        if (Math.abs(y[i] - avgFilter[i - 1]) > (threshold * stdFilter[i - 1])) {
            if (y[i] > avgFilter[i - 1]) {
                signals[i] = +1 // positive signal
            } else {
                signals[i] = -1 // negative signal
            }
            // make influence lower
            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1]
        } else {
            signals[i] = 0 // no signal
            filteredY[i] = y[i]
        }

        // adjust the filters
        const y_lag = filteredY.slice(i - lag, i)
        avgFilter[i] = mean(y_lag)
        stdFilter[i] = stddev(y_lag)
    }

    return signals
}

module.exports = smoothed_z_score

我们尝试在我们的数据集上使用平滑的z-score算法,这导致了过度敏感或不敏感(取决于参数如何调整),几乎没有中间地带。在我们站点的交通信号中,我们观察到一个低频基线,它代表了每天的周期,即使有最好的可能参数(如下所示),它仍然在第4天下降,特别是因为大多数数据点被认为是异常的。

在原始z-score算法的基础上,我们提出了一种通过反向滤波来解决这个问题的方法。改进后的算法及其在电视商业流量归因中的应用详见我们的团队博客。