更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

这种z-scores方法在峰值检测方面非常有效,也有助于异常值的去除。异常值对话经常讨论每个点的统计价值和变化数据的伦理。

但是,在来自易出错的串行通信或易出错的传感器的重复错误传感器值的情况下,错误或虚假读数中没有统计值。它们需要被识别并移除。

从视觉上看,错误是显而易见的。下图中的直线显示了需要删除的内容。但是用算法识别和消除错误是相当具有挑战性的。z分数效果很好。

下图是通过串行通信从传感器获得的值。偶尔的串行通信错误,传感器错误或两者都导致重复的,明显错误的数据点。

z-score峰值检测器能够在虚假数据点上发出信号,并生成一个干净的结果数据集,同时保留正确数据的特征:

其他回答

一种方法是根据以下观察来检测峰:

时间t是一个峰值(y (t) > y (t - 1)) & & ((t) > y (t + 1))

它通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

峰如果y (y (t) - (t-dt) > m) && (y (t) - y (t + dt) > m)

dt和m是控制灵敏度和延时的参数

这是你用上述算法得到的结果:

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置m = 0.5,你可以得到一个更清晰的信号,只有一个假阳性:

我们尝试在我们的数据集上使用平滑的z-score算法,这导致了过度敏感或不敏感(取决于参数如何调整),几乎没有中间地带。在我们站点的交通信号中,我们观察到一个低频基线,它代表了每天的周期,即使有最好的可能参数(如下所示),它仍然在第4天下降,特别是因为大多数数据点被认为是异常的。

在原始z-score算法的基础上,我们提出了一种通过反向滤波来解决这个问题的方法。改进后的算法及其在电视商业流量归因中的应用详见我们的团队博客。

c++实现

#include <iostream>
#include <vector>
#include <algorithm>
#include <unordered_map>
#include <cmath>
#include <iterator>
#include <numeric>

using namespace std;

typedef long double ld;
typedef unsigned int uint;
typedef std::vector<ld>::iterator vec_iter_ld;

/**
 * Overriding the ostream operator for pretty printing vectors.
 */
template<typename T>
std::ostream &operator<<(std::ostream &os, std::vector<T> vec) {
    os << "[";
    if (vec.size() != 0) {
        std::copy(vec.begin(), vec.end() - 1, std::ostream_iterator<T>(os, " "));
        os << vec.back();
    }
    os << "]";
    return os;
}

/**
 * This class calculates mean and standard deviation of a subvector.
 * This is basically stats computation of a subvector of a window size qual to "lag".
 */
class VectorStats {
public:
    /**
     * Constructor for VectorStats class.
     *
     * @param start - This is the iterator position of the start of the window,
     * @param end   - This is the iterator position of the end of the window,
     */
    VectorStats(vec_iter_ld start, vec_iter_ld end) {
        this->start = start;
        this->end = end;
        this->compute();
    }

    /**
     * This method calculates the mean and standard deviation using STL function.
     * This is the Two-Pass implementation of the Mean & Variance calculation.
     */
    void compute() {
        ld sum = std::accumulate(start, end, 0.0);
        uint slice_size = std::distance(start, end);
        ld mean = sum / slice_size;
        std::vector<ld> diff(slice_size);
        std::transform(start, end, diff.begin(), [mean](ld x) { return x - mean; });
        ld sq_sum = std::inner_product(diff.begin(), diff.end(), diff.begin(), 0.0);
        ld std_dev = std::sqrt(sq_sum / slice_size);

        this->m1 = mean;
        this->m2 = std_dev;
    }

    ld mean() {
        return m1;
    }

    ld standard_deviation() {
        return m2;
    }

private:
    vec_iter_ld start;
    vec_iter_ld end;
    ld m1;
    ld m2;
};

/**
 * This is the implementation of the Smoothed Z-Score Algorithm.
 * This is direction translation of https://stackoverflow.com/a/22640362/1461896.
 *
 * @param input - input signal
 * @param lag - the lag of the moving window
 * @param threshold - the z-score at which the algorithm signals
 * @param influence - the influence (between 0 and 1) of new signals on the mean and standard deviation
 * @return a hashmap containing the filtered signal and corresponding mean and standard deviation.
 */
unordered_map<string, vector<ld>> z_score_thresholding(vector<ld> input, int lag, ld threshold, ld influence) {
    unordered_map<string, vector<ld>> output;

    uint n = (uint) input.size();
    vector<ld> signals(input.size());
    vector<ld> filtered_input(input.begin(), input.end());
    vector<ld> filtered_mean(input.size());
    vector<ld> filtered_stddev(input.size());

    VectorStats lag_subvector_stats(input.begin(), input.begin() + lag);
    filtered_mean[lag - 1] = lag_subvector_stats.mean();
    filtered_stddev[lag - 1] = lag_subvector_stats.standard_deviation();

    for (int i = lag; i < n; i++) {
        if (abs(input[i] - filtered_mean[i - 1]) > threshold * filtered_stddev[i - 1]) {
            signals[i] = (input[i] > filtered_mean[i - 1]) ? 1.0 : -1.0;
            filtered_input[i] = influence * input[i] + (1 - influence) * filtered_input[i - 1];
        } else {
            signals[i] = 0.0;
            filtered_input[i] = input[i];
        }
        VectorStats lag_subvector_stats(filtered_input.begin() + (i - lag), filtered_input.begin() + i);
        filtered_mean[i] = lag_subvector_stats.mean();
        filtered_stddev[i] = lag_subvector_stats.standard_deviation();
    }

    output["signals"] = signals;
    output["filtered_mean"] = filtered_mean;
    output["filtered_stddev"] = filtered_stddev;

    return output;
};

int main() {
    vector<ld> input = {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0,
                        1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0,
                        1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0, 3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0,
                        1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0, 1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

    int lag = 30;
    ld threshold = 5.0;
    ld influence = 0.0;
    unordered_map<string, vector<ld>> output = z_score_thresholding(input, lag, threshold, influence);
    cout << output["signals"] << endl;
}

如果边界值或其他标准取决于未来值,那么唯一的解决方案(没有时间机器,或其他关于未来值的知识)是推迟任何决定,直到有足够的未来值。如果你想要一个高于均值的水平,例如,20点,那么你必须等到你至少有19点才能做出任何峰值决策,否则下一个新点可能会完全超过你19点之前的阈值。

Added: If the statistical distribution of the peak heights could be heavy tailed, instead of Uniform or Gaussian, then you may need to wait until you see several thousand peaks before it starts to become unlikely that a hidden Pareto distribution won't produce a peak many times larger than any you currently have seen before or have in your current plot. Unless you somehow know in advance that the very next point can't be 1e20, it could appear, which after rescaling your plot's Y dimension, would be flat up until that point.

原文的附录1:Matlab和R翻译

Matlab代码

function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
% Initialise signal results
signals = zeros(length(y),1);
% Initialise filtered series
filteredY = y(1:lag+1);
% Initialise filters
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
% Loop over all datapoints y(lag+2),...,y(t)
for i=lag+2:length(y)
    % If new value is a specified number of deviations away
    if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
        if y(i) > avgFilter(i-1)
            % Positive signal
            signals(i) = 1;
        else
            % Negative signal
            signals(i) = -1;
        end
        % Make influence lower
        filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
    else
        % No signal
        signals(i) = 0;
        filteredY(i) = y(i);
    end
    % Adjust the filters
    avgFilter(i) = mean(filteredY(i-lag:i));
    stdFilter(i) = std(filteredY(i-lag:i));
end
% Done, now return results
end

例子:

% Data
y = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1,...
    1 1 1.1 0.9 1 1.1 1 1 0.9 1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1,...
    1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1,...
    1 3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

% Settings
lag = 30;
threshold = 5;
influence = 0;

% Get results
[signals,avg,dev] = ThresholdingAlgo(y,lag,threshold,influence);

figure; subplot(2,1,1); hold on;
x = 1:length(y); ix = lag+1:length(y);
area(x(ix),avg(ix)+threshold*dev(ix),'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(x(ix),avg(ix)-threshold*dev(ix),'FaceColor',[1 1 1],'EdgeColor','none');
plot(x(ix),avg(ix),'LineWidth',1,'Color','cyan','LineWidth',1.5);
plot(x(ix),avg(ix)+threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(x(ix),avg(ix)-threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(1:length(y),y,'b');
subplot(2,1,2);
stairs(signals,'r','LineWidth',1.5); ylim([-1.5 1.5]);

R代码

ThresholdingAlgo <- function(y,lag,threshold,influence) {
  signals <- rep(0,length(y))
  filteredY <- y[0:lag]
  avgFilter <- NULL
  stdFilter <- NULL
  avgFilter[lag] <- mean(y[0:lag], na.rm=TRUE)
  stdFilter[lag] <- sd(y[0:lag], na.rm=TRUE)
  for (i in (lag+1):length(y)){
    if (abs(y[i]-avgFilter[i-1]) > threshold*stdFilter[i-1]) {
      if (y[i] > avgFilter[i-1]) {
        signals[i] <- 1;
      } else {
        signals[i] <- -1;
      }
      filteredY[i] <- influence*y[i]+(1-influence)*filteredY[i-1]
    } else {
      signals[i] <- 0
      filteredY[i] <- y[i]
    }
    avgFilter[i] <- mean(filteredY[(i-lag):i], na.rm=TRUE)
    stdFilter[i] <- sd(filteredY[(i-lag):i], na.rm=TRUE)
  }
  return(list("signals"=signals,"avgFilter"=avgFilter,"stdFilter"=stdFilter))
}

例子:

# Data
y <- c(1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1)

lag       <- 30
threshold <- 5
influence <- 0

# Run algo with lag = 30, threshold = 5, influence = 0
result <- ThresholdingAlgo(y,lag,threshold,influence)

# Plot result
par(mfrow = c(2,1),oma = c(2,2,0,0) + 0.1,mar = c(0,0,2,1) + 0.2)
plot(1:length(y),y,type="l",ylab="",xlab="") 
lines(1:length(y),result$avgFilter,type="l",col="cyan",lwd=2)
lines(1:length(y),result$avgFilter+threshold*result$stdFilter,type="l",col="green",lwd=2)
lines(1:length(y),result$avgFilter-threshold*result$stdFilter,type="l",col="green",lwd=2)
plot(result$signals,type="S",col="red",ylab="",xlab="",ylim=c(-1.5,1.5),lwd=2)

这段代码(两种语言)将为原始问题的数据产生以下结果:


附录2原答案:Matlab演示代码

(点击创建数据)

function [] = RobustThresholdingDemo()

%% SPECIFICATIONS
lag         = 5;       % lag for the smoothing
threshold   = 3.5;     % number of st.dev. away from the mean to signal
influence   = 0.3;     % when signal: how much influence for new data? (between 0 and 1)
                       % 1 is normal influence, 0.5 is half      
%% START DEMO
DemoScreen(30,lag,threshold,influence);

end

function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
signals = zeros(length(y),1);
filteredY = y(1:lag+1);
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
for i=lag+2:length(y)
    if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
        if y(i) > avgFilter(i-1)
            signals(i) = 1;
        else
            signals(i) = -1;
        end
        filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
    else
        signals(i) = 0;
        filteredY(i) = y(i);
    end
    avgFilter(i) = mean(filteredY(i-lag:i));
    stdFilter(i) = std(filteredY(i-lag:i));
end
end

% Demo screen function
function [] = DemoScreen(n,lag,threshold,influence)
figure('Position',[200 100,1000,500]);
subplot(2,1,1);
title(sprintf(['Draw data points (%.0f max)      [settings: lag = %.0f, '...
    'threshold = %.2f, influence = %.2f]'],n,lag,threshold,influence));
ylim([0 5]); xlim([0 50]);
H = gca; subplot(2,1,1);
set(H, 'YLimMode', 'manual'); set(H, 'XLimMode', 'manual');
set(H, 'YLim', get(H,'YLim')); set(H, 'XLim', get(H,'XLim'));
xg = []; yg = [];
for i=1:n
    try
        [xi,yi] = ginput(1);
    catch
        return;
    end
    xg = [xg xi]; yg = [yg yi];
    if i == 1
        subplot(2,1,1); hold on;
        plot(H, xg(i),yg(i),'r.'); 
        text(xg(i),yg(i),num2str(i),'FontSize',7);
    end
    if length(xg) > lag
        [signals,avg,dev] = ...
            ThresholdingAlgo(yg,lag,threshold,influence);
        area(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
            'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
        area(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
            'FaceColor',[1 1 1],'EdgeColor','none');
        plot(xg(lag+1:end),avg(lag+1:end),'LineWidth',1,'Color','cyan');
        plot(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
            'LineWidth',1,'Color','green');
        plot(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
            'LineWidth',1,'Color','green');
        subplot(2,1,2); hold on; title('Signal output');
        stairs(xg(lag+1:end),signals(lag+1:end),'LineWidth',2,'Color','blue');
        ylim([-2 2]); xlim([0 50]); hold off;
    end
    subplot(2,1,1); hold on;
    for j=2:i
        plot(xg([j-1:j]),yg([j-1:j]),'r'); plot(H,xg(j),yg(j),'r.');
        text(xg(j),yg(j),num2str(j),'FontSize',7);
    end
end
end