似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。

我的问题有两个方面:

用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?


当前回答

您也可以编写自己的Python C扩展。

这当然不是最简单的方法,但与使用np相比,这将使您运行得更快,内存效率更高。堆积:作为建筑块的堆积

// moving_average.c
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <Python.h>
#include <numpy/arrayobject.h>

static PyObject *moving_average(PyObject *self, PyObject *args) {
    PyObject *input;
    int64_t window_size;
    PyArg_ParseTuple(args, "Ol", &input, &window_size);
    if (PyErr_Occurred()) return NULL;
    if (!PyArray_Check(input) || !PyArray_ISNUMBER((PyArrayObject *)input)) {
        PyErr_SetString(PyExc_TypeError, "First argument must be a numpy array with numeric dtype");
        return NULL;
    }
    
    int64_t input_size = PyObject_Size(input);
    double *input_data;
    if (PyArray_AsCArray(&input, &input_data, (npy_intp[]){ [0] = input_size }, 1, PyArray_DescrFromType(NPY_DOUBLE)) != 0) {
        PyErr_SetString(PyExc_TypeError, "Failed to simulate C array of type double");
        return NULL;
    }
    
    int64_t output_size = input_size - window_size + 1;
    PyObject *output = PyArray_SimpleNew(1, (npy_intp[]){ [0] = output_size }, NPY_DOUBLE);
    double *output_data = PyArray_DATA((PyArrayObject *)output);
    
    double cumsum_before = 0;
    double cumsum_after = 0;
    for (int i = 0; i < window_size; ++i) {
        cumsum_after += input_data[i];
    }
    for (int i = 0; i < output_size - 1; ++i) {
        output_data[i] = (cumsum_after - cumsum_before) / window_size;
        cumsum_after += input_data[i + window_size];
        cumsum_before += input_data[i];
    }
    output_data[output_size - 1] = (cumsum_after - cumsum_before) / window_size;

    return output;
}

static PyMethodDef methods[] = {
    {
        "moving_average", 
        moving_average, 
        METH_VARARGS, 
        "Rolling mean of numpy array with specified window size"
    },
    {NULL, NULL, 0, NULL}
};

static struct PyModuleDef moduledef = {
    PyModuleDef_HEAD_INIT,
    "moving_average",
    "C extension for finding the rolling mean of a numpy array",
    -1,
    methods
};

PyMODINIT_FUNC PyInit_moving_average(void) {
    PyObject *module = PyModule_Create(&moduledef);
    import_array();
    return module;
}

METH_VARARGS specifies that the method only takes positional arguments. PyArg_ParseTuple allows you to parse these positional arguments. By using PyErr_SetString and returning NULL from the method, you can signal that an exception has occurred to the Python interpreter from the C extension. PyArray_AsCArray allows your method to be polymorphic when it comes to input array dtype, alignment, whether the array is C-contiguous (See "Can a numpy 1d array not be contiguous?") etc. without needing to create a copy of the array. If you instead used PyArray_DATA, you'd need to deal with this yourself. PyArray_SimpleNew allows you to create a new numpy array. This is similar to using np.empty. The array will not be initialized, and might contain non-deterministic junk which could surprise you if you forget to overwrite it.

构建C扩展

# setup.py
from setuptools import setup, Extension
import numpy

setup(
  ext_modules=[
    Extension(
      'moving_average',
      ['moving_average.c'],
      include_dirs=[numpy.get_include()]
    )
  ]
)

# python setup.py build_ext --build-lib=.

基准

import numpy as np

# Our compiled C extension:
from moving_average import moving_average as moving_average_c

# Answer by Jaime using npcumsum
def moving_average_cumsum(a, n) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

# Answer by yatu using np.convolve
def moving_average_convolve(a, n):
    return np.convolve(a, np.ones(n), 'valid') / n

a = np.random.rand(1_000_000)
print('window_size = 3')
%timeit moving_average_c(a, 3)
%timeit moving_average_cumsum(a, 3)
%timeit moving_average_convolve(a, 3)

print('\nwindow_size = 100')
%timeit moving_average_c(a, 100)
%timeit moving_average_cumsum(a, 100)
%timeit moving_average_convolve(a, 100)
window_size = 3
958 µs ± 4.68 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
4.52 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
809 µs ± 463 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

window_size = 100
977 µs ± 937 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
6.16 ms ± 19.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
14.2 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

其他回答

Talib包含一个简单的移动平均工具,以及其他类似的平均工具(即指数移动平均)。下面将该方法与其他一些解决方案进行比较。


%timeit pd.Series(np.arange(100000)).rolling(3).mean()
2.53 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit talib.SMA(real = np.arange(100000.), timeperiod = 3)
348 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit moving_average(np.arange(100000))
638 µs ± 45.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

需要注意的是,real必须有dtype = float的元素。否则将引发以下错误

例外:实不是双的

我觉得使用瓶颈可以很容易地解决这个问题

参见下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)

这就给出了每个轴上的移动平均值。

“mm”是“a”的移动平均值。 “窗口”是考虑移动均值的最大条目数。 "min_count"是考虑移动平均值的最小条目数(例如,对于第一个元素或如果数组有nan值)。

好在瓶颈有助于处理nan值,而且非常高效。

如果你已经有一个已知大小的数组

import numpy as np                                         
M=np.arange(12)
                                                               
avg=[]                                                         
i=0
while i<len(M)-2: #for n point average len(M) - (n-1)
        avg.append((M[i]+M[i+1]+M[i+2])/3) #n is denominator                       
        i+=1     
                                                                                                    
print(avg)

通过比较下面的解决方案与使用cumsum of numpy的解决方案,这个解决方案几乎花费了一半的时间。这是因为它不需要遍历整个数组来做cumsum,然后做所有的减法。此外,如果数组很大且数量很大(可能溢出),cumsum可能是“危险的”。当然,这里也存在危险,但至少我们只把重要的数字加在一起。

def moving_average(array_numbers, n):
    if n > len(array_numbers):
      return []
    temp_sum = sum(array_numbers[:n])
    averages = [temp_sum / float(n)]
    for first_index, item in enumerate(array_numbers[n:]):
        temp_sum += item - array_numbers[first_index]
        averages.append(temp_sum / float(n))
    return averages

实际上,我想要一个稍微不同于公认答案的行为。我正在为sklearn管道构建一个移动平均特征提取器,因此我要求移动平均的输出与输入具有相同的维数。我想要的是让移动平均假设级数保持不变,即[1,2,3,4,5]与窗口2的移动平均将得到[1.5,2.5,3.5,4.5,5.0]。

对于列向量(我的用例)我们得到

def moving_average_col(X, n):
  z2 = np.cumsum(np.pad(X, ((n,0),(0,0)), 'constant', constant_values=0), axis=0)
  z1 = np.cumsum(np.pad(X, ((0,n),(0,0)), 'constant', constant_values=X[-1]), axis=0)
  return (z1-z2)[(n-1):-1]/n

对于数组

def moving_average_array(X, n):
  z2 = np.cumsum(np.pad(X, (n,0), 'constant', constant_values=0))
  z1 = np.cumsum(np.pad(X, (0,n), 'constant', constant_values=X[-1]))
  return (z1-z2)[(n-1):-1]/n

当然,不必假设填充值为常数,但在大多数情况下这样做应该足够了。