我需要一个滚动窗口(又名滑动窗口)可迭代的序列/迭代器/生成器。(默认的Python迭代可以被认为是一种特殊情况,其中窗口长度为1。)我目前正在使用以下代码。我怎样才能做得更优雅和/或更有效?

def rolling_window(seq, window_size):
    it = iter(seq)
    win = [it.next() for cnt in xrange(window_size)] # First window
    yield win
    for e in it: # Subsequent windows
        win[:-1] = win[1:]
        win[-1] = e
        yield win

if __name__=="__main__":
    for w in rolling_window(xrange(6), 3):
        print w

"""Example output:   
   [0, 1, 2]
   [1, 2, 3]
   [2, 3, 4]
   [3, 4, 5]
"""

对于window_size == 2的特定情况(即,在序列中迭代相邻的重叠对),请参见如何从列表中迭代重叠(当前,下一个)值对?


当前回答

这里有一行。我对它进行了计时,它与顶部答案的性能相当,并且随着更大的seq逐渐变得更好,len(seq) = 20时慢20%,len(seq) = 10000时慢7%

zip(*[seq[i:(len(seq) - n - 1 + i)] for i in range(n)])

其他回答

为什么不

def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

它被记录在Python文档中。 您可以轻松地将其扩展到更宽的窗口。

多个迭代器!

def window(seq, size, step=1):
    # initialize iterators
    iters = [iter(seq) for i in range(size)]
    # stagger iterators (without yielding)
    [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)]
    while(True):
        yield [next(i) for i in iters]
        # next line does nothing for step = 1 (skips iterations for step > 1)
        [next(i) for i in iters for j in range(step-1)]

next(it)在序列结束时引发StopIteration,出于一些我无法理解的很酷的原因,yield语句在这里除外它,函数返回,忽略没有形成完整窗口的剩余值。

无论如何,这是目前为止行数最少的解决方案,它唯一的要求是seq实现__iter__或__getitem__,并且除了@dansalmo的解决方案之外,不依赖于itertools或集合:)

深度学习中滑动窗口数据的优化函数

def SlidingWindow(X, window_length, stride):
    indexer = np.arange(window_length)[None, :] + stride*np.arange(int(len(X)/stride)-window_length+4)[:, None]
    return X.take(indexer)

应用于多维数组

import numpy as np
def SlidingWindow(X, window_length, stride1):
    stride=  X.shape[1]*stride1
    window_length = window_length*X.shape[1]
    indexer = np.arange(window_length)[None, :] + stride1*np.arange(int(len(X)/stride1)-window_length-1)[:, None]
    return X.take(indexer)

我的两个版本的窗口实现

from typing import Sized, Iterable

def window(seq: Sized, n: int, strid: int = 1, drop_last: bool = False):
    for i in range(0, len(seq), strid):
        res = seq[i:i + n]
        if drop_last and len(res) < n:
            break
        yield res


def window2(seq: Iterable, n: int, strid: int = 1, drop_last: bool = False):
    it = iter(seq)
    result = []
    step = 0
    for i, ele in enumerate(it):
        result.append(ele)
        result = result[-n:]
        if len(result) == n:
            if step % strid == 0:
                yield result
            step += 1
    if not drop_last:
        yield result

这里有一行。我对它进行了计时,它与顶部答案的性能相当,并且随着更大的seq逐渐变得更好,len(seq) = 20时慢20%,len(seq) = 10000时慢7%

zip(*[seq[i:(len(seq) - n - 1 + i)] for i in range(n)])