我需要一个滚动窗口(又名滑动窗口)可迭代的序列/迭代器/生成器。(默认的Python迭代可以被认为是一种特殊情况,其中窗口长度为1。)我目前正在使用以下代码。我怎样才能做得更优雅和/或更有效?

def rolling_window(seq, window_size):
    it = iter(seq)
    win = [it.next() for cnt in xrange(window_size)] # First window
    yield win
    for e in it: # Subsequent windows
        win[:-1] = win[1:]
        win[-1] = e
        yield win

if __name__=="__main__":
    for w in rolling_window(xrange(6), 3):
        print w

"""Example output:   
   [0, 1, 2]
   [1, 2, 3]
   [2, 3, 4]
   [3, 4, 5]
"""

对于window_size == 2的特定情况(即,在序列中迭代相邻的重叠对),请参见如何从列表中迭代重叠(当前,下一个)值对?


当前回答

#Importing the numpy library
import numpy as np
arr = np.arange(6) #Sequence
window_size = 3
np.lib.stride_tricks.as_strided(arr, shape= (len(arr) - window_size +1, window_size), 
strides = arr.strides*2)

"""Example output:

  [0, 1, 2]
  [1, 2, 3]
  [2, 3, 4]
  [3, 4, 5]

"""

其他回答

我使用下面的代码作为一个简单的滑动窗口,它使用生成器来大幅提高可读性。根据我的经验,到目前为止,它的速度足以用于生物信息学序列分析。

我把它包括在这里是因为我还没有看到这种方法被使用过。同样,我对它的比较性能没有任何评价。

def slidingWindow(sequence,winSize,step=1):
"""Returns a generator that will iterate through
the defined chunks of input sequence. Input sequence
must be sliceable."""

    # Verify the inputs
    if not ((type(winSize) == type(0)) and (type(step) == type(0))):
        raise Exception("**ERROR** type(winSize) and type(step) must be int.")
    if step > winSize:
        raise Exception("**ERROR** step must not be larger than winSize.")
    if winSize > len(sequence):
        raise Exception("**ERROR** winSize must not be larger than sequence length.")

    # Pre-compute number of chunks to emit
    numOfChunks = ((len(sequence)-winSize)/step)+1

    # Do the work
    for i in range(0,numOfChunks*step,step):
        yield sequence[i:i+winSize]

更新

Kelly发现这是一个重复的答案。但我在这里留下这个作为反例,因为我包含了一个毫无意义的最小值。

所以如果你想用min来避免IndexError,没有必要,range会帮你处理这种情况。


旧的答案

奇怪的是,当n > len(l)返回[]时,下面的句柄在语义上是正确的。

>>> l = [0, 1, 2, 3, 4]

>>> n = 2
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1], [1, 2], [2, 3], [3, 4]]
>>>
>>> n = 3
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>>
>>> n = 4
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2, 3], [1, 2, 3, 4]]
>>>
>>> n = 5
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2, 3, 4]]
>>>
>>> n = 10 # n > len(l)
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> []

这里是一个泛化,增加了对step, fillvalue参数的支持:

from collections import deque
from itertools import islice

def sliding_window(iterable, size=2, step=1, fillvalue=None):
    if size < 0 or step < 1:
        raise ValueError
    it = iter(iterable)
    q = deque(islice(it, size), maxlen=size)
    if not q:
        return  # empty iterable or size == 0
    q.extend(fillvalue for _ in range(size - len(q)))  # pad to size
    while True:
        yield iter(q)  # iter() to avoid accidental outside modifications
        try:
            q.append(next(it))
        except StopIteration: # Python 3.5 pep 479 support
            return
        q.extend(next(it, fillvalue) for _ in range(step - 1))

它每次产生块大小的项目,每次迭代滚动步骤位置,在必要时用fillvalue填充每个块。示例:size=4, step=3, fillvalue='*':

 [a b c d]e f g h i j k l m n o p q r s t u v w x y z
  a b c[d e f g]h i j k l m n o p q r s t u v w x y z
  a b c d e f[g h i j]k l m n o p q r s t u v w x y z
  a b c d e f g h i[j k l m]n o p q r s t u v w x y z
  a b c d e f g h i j k l[m n o p]q r s t u v w x y z
  a b c d e f g h i j k l m n o[p q r s]t u v w x y z
  a b c d e f g h i j k l m n o p q r[s t u v]w x y z
  a b c d e f g h i j k l m n o p q r s t u[v w x y]z
  a b c d e f g h i j k l m n o p q r s t u v w x[y z * *]

有关step参数的用例示例,请参见用python有效地处理大型.txt文件。

另一种从列表生成固定长度窗口的简单方法

from collections import deque

def window(ls,window_size=3):
    window = deque(maxlen=window_size)

    for element in ls:
        
        if len(window)==window_size:
            yield list(window)
        window.append(element)

ls = [0,1,2,3,4,5]

for w in window(ls):
    print(w)

除了我想到的解决方案,我还测试了一些解决方案。我发现我想出的是最快的,所以我想分享我的python3实现。

import itertools
import sys

def windowed(l, stride):
    return zip(*[itertools.islice(l, i, sys.maxsize) for i in range(stride)])