似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。

我的问题有两个方面:

用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?


当前回答

下面是一个使用numba的快速实现(注意类型)。注意它确实包含移位的nan。

import numpy as np
import numba as nb

@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
        fastmath=True,nopython=True)
def moving_average( array, window ):    
    ret = np.cumsum(array)
    ret[window:] = ret[window:] - ret[:-window]
    ma = ret[window - 1:] / window
    n = np.empty(window-1); n.fill(np.nan)
    return np.concatenate((n.ravel(), ma.ravel())) 

其他回答

从Numpy 1.20开始,sliding_window_view提供了一种在元素窗口中滑动/滚动的方法。然后你可以分别取平均值。

例如,对于一个4元素的窗口:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])

注意sliding_window_view的中间结果:

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5,  3,  8, 10],
#        [ 3,  8, 10,  2],
#        [ 8, 10,  2,  1],
#        [10,  2,  1,  5],
#        [ 2,  1,  5,  1],
#        [ 1,  5,  1,  0],
#        [ 5,  1,  0,  2]])

如果有人需要一个简单的解决方案,这里有一个

def moving_average(a,n):
    N=len(a)
    return np.array([np.mean(a[i:i+n]) for i in np.arange(0,N-n+1)])

你可以通过在np.arange(0,N-n+1,step)中添加step参数来改变窗口之间的重叠

如果你只想要一个简单的非加权移动平均,你可以很容易地用np实现它。cumsum,可能比基于FFT的方法更快:

修正了Bean在代码中发现的偏离一的错误索引。编辑

def moving_average(a, n=3) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

>>> a = np.arange(20)
>>> moving_average(a)
array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.,  11.,
        12.,  13.,  14.,  15.,  16.,  17.,  18.])
>>> moving_average(a, n=4)
array([  1.5,   2.5,   3.5,   4.5,   5.5,   6.5,   7.5,   8.5,   9.5,
        10.5,  11.5,  12.5,  13.5,  14.5,  15.5,  16.5,  17.5])

所以我猜答案是:它真的很容易实现,也许numpy已经有了一些专门的功能。

移动平均线 迭代器方法 在i处反转数组,简单地求i到n的均值。 使用列表推导式在运行中生成迷你数组。

x = np.random.randint(10, size=20)

def moving_average(arr, n):
    return [ (arr[:i+1][::-1][:n]).mean() for i, ele in enumerate(arr) ]
d = 5

moving_average(x, d)

张量卷积

moving_average = np.convolve(x, np.ones(d)/d, mode='valid')

所有的答案似乎都集中在预先计算的列表的情况下。对于实际运行的用例,数字一个接一个地进来,这里有一个简单的类,它提供了对最后N个值求平均的服务:

import numpy as np
class RunningAverage():
    def __init__(self, stack_size):
        self.stack = [0 for _ in range(stack_size)]
        self.ptr = 0
        self.full_cycle = False
    def add(self,value):
        self.stack[self.ptr] = value
        self.ptr += 1
        if self.ptr == len(self.stack):
            self.full_cycle = True
            self.ptr = 0
    def get_avg(self):
        if self.full_cycle:
            return np.mean(self.stack)
        else:
            return np.mean(self.stack[:self.ptr])

用法:

N = 50  # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
    value = <my computation>
    run_avg.add(value)
    if i % 20 ==0: # print once in 20 iters:
        print(f'the average value is {run_avg.get_avg()}')