我需要一个滚动窗口(又名滑动窗口)可迭代的序列/迭代器/生成器。(默认的Python迭代可以被认为是一种特殊情况,其中窗口长度为1。)我目前正在使用以下代码。我怎样才能做得更优雅和/或更有效?

def rolling_window(seq, window_size):
    it = iter(seq)
    win = [it.next() for cnt in xrange(window_size)] # First window
    yield win
    for e in it: # Subsequent windows
        win[:-1] = win[1:]
        win[-1] = e
        yield win

if __name__=="__main__":
    for w in rolling_window(xrange(6), 3):
        print w

"""Example output:   
   [0, 1, 2]
   [1, 2, 3]
   [2, 3, 4]
   [3, 4, 5]
"""

对于window_size == 2的特定情况(即,在序列中迭代相邻的重叠对),请参见如何从列表中迭代重叠(当前,下一个)值对?


这似乎是为collections.deque定制的,因为您实际上有一个FIFO(添加到一端,从另一端删除)。然而,即使你使用列表,你也不应该切片两次;相反,您应该只从列表中弹出(0)并追加()新项。

下面是一个基于deque的优化实现:

from collections import deque

def window(seq, n=2):
    it = iter(seq)
    win = deque((next(it, None) for _ in xrange(n)), maxlen=n)
    yield win
    append = win.append
    for e in it:
        append(e)
        yield win

在我的测试中,它在大多数时候都轻松击败了这里发布的其他所有东西,尽管pillmuncher的tee版本在大可迭代对象和小窗口方面击败了它。在较大的窗口上,deque再次以原始速度领先。

Access to individual items in the deque may be faster or slower than with lists or tuples. (Items near the beginning are faster, or items near the end if you use a negative index.) I put a sum(w) in the body of my loop; this plays to the deque's strength (iterating from one item to the next is fast, so this loop ran a a full 20% faster than the next fastest method, pillmuncher's). When I changed it to individually look up and add items in a window of ten, the tables turned and the tee method was 20% faster. I was able to recover some speed by using negative indexes for the last five terms in the addition, but tee was still a little faster. Overall I would estimate that either one is plenty fast for most uses and if you need a little more performance, profile and pick the one that works best.


在旧版本的Python文档中有一个itertools示例:

from itertools import islice

def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result
    for elem in it:
        result = result[1:] + (elem,)
        yield result

文档中的那个更简洁一点,我想它使用了itertools来达到更好的效果。


如果你的迭代器是一个简单的列表/元组,用指定的窗口大小滑动它的简单方法是:

seq = [0, 1, 2, 3, 4, 5]
window_size = 3

for i in range(len(seq) - window_size + 1):
    print(seq[i: i + window_size])

输出:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

我喜欢t ():

from itertools import tee, izip

def window(iterable, size):
    iters = tee(iterable, size)
    for i in xrange(1, size):
        for each in iters[i:]:
            next(each, None)
    return izip(*iters)

for each in window(xrange(6), 3):
    print list(each)

给:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

我使用下面的代码作为一个简单的滑动窗口,它使用生成器来大幅提高可读性。根据我的经验,到目前为止,它的速度足以用于生物信息学序列分析。

我把它包括在这里是因为我还没有看到这种方法被使用过。同样,我对它的比较性能没有任何评价。

def slidingWindow(sequence,winSize,step=1):
"""Returns a generator that will iterate through
the defined chunks of input sequence. Input sequence
must be sliceable."""

    # Verify the inputs
    if not ((type(winSize) == type(0)) and (type(step) == type(0))):
        raise Exception("**ERROR** type(winSize) and type(step) must be int.")
    if step > winSize:
        raise Exception("**ERROR** step must not be larger than winSize.")
    if winSize > len(sequence):
        raise Exception("**ERROR** winSize must not be larger than sequence length.")

    # Pre-compute number of chunks to emit
    numOfChunks = ((len(sequence)-winSize)/step)+1

    # Do the work
    for i in range(0,numOfChunks*step,step):
        yield sequence[i:i+winSize]

只是一个简短的贡献。

由于当前的python文档在itertool示例中没有“window”(即,在http://docs.python.org/library/itertools.html的底部),这里有一个基于 石斑鱼的代码,这是给出的例子之一:

import itertools as it
def window(iterable, size):
    shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)]
    return it.izip(*shiftedStarts)

基本上,我们创建了一系列切片迭代器,每个迭代器的起点都在前面一个位置。然后,我们把它们拉在一起。注意,这个函数返回一个生成器(它本身不是直接的生成器)。

就像上面的appendingelement和advingiterator版本一样,性能(即,哪个是最好的)随列表大小和窗口大小而变化。我喜欢这个,因为它是一个两行代码(它也可以是一行代码,但我更喜欢命名概念)。

事实证明上面的代码是错误的。如果传递给iterable的参数是一个序列则有效,但如果它是一个迭代器则无效。如果它是一个迭代器,那么在islice调用之间共享相同的迭代器(但不是tee - d),这将严重破坏事情。

下面是一些固定的代码:

import itertools as it
def window(iterable, size):
    itrs = it.tee(iterable, size)
    shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)]
    return it.izip(*shiftedStarts)

另外,书里还有一个版本。这个版本不是复制一个迭代器,然后多次向前复制,而是在开始位置向前移动时成对复制每个迭代器。因此,迭代器t既提供了起点为t的“完整”迭代器,也提供了创建迭代器t + 1的基础:

import itertools as it
def window4(iterable, size):
    complete_itr, incomplete_itr = it.tee(iterable, 2)
    iters = [complete_itr]
    for i in xrange(1, size):
        incomplete_itr.next()
        complete_itr, incomplete_itr = it.tee(incomplete_itr, 2)
        iters.append(complete_itr)
    return it.izip(*iters)

这里是一个泛化,增加了对step, fillvalue参数的支持:

from collections import deque
from itertools import islice

def sliding_window(iterable, size=2, step=1, fillvalue=None):
    if size < 0 or step < 1:
        raise ValueError
    it = iter(iterable)
    q = deque(islice(it, size), maxlen=size)
    if not q:
        return  # empty iterable or size == 0
    q.extend(fillvalue for _ in range(size - len(q)))  # pad to size
    while True:
        yield iter(q)  # iter() to avoid accidental outside modifications
        try:
            q.append(next(it))
        except StopIteration: # Python 3.5 pep 479 support
            return
        q.extend(next(it, fillvalue) for _ in range(step - 1))

它每次产生块大小的项目,每次迭代滚动步骤位置,在必要时用fillvalue填充每个块。示例:size=4, step=3, fillvalue='*':

 [a b c d]e f g h i j k l m n o p q r s t u v w x y z
  a b c[d e f g]h i j k l m n o p q r s t u v w x y z
  a b c d e f[g h i j]k l m n o p q r s t u v w x y z
  a b c d e f g h i[j k l m]n o p q r s t u v w x y z
  a b c d e f g h i j k l[m n o p]q r s t u v w x y z
  a b c d e f g h i j k l m n o[p q r s]t u v w x y z
  a b c d e f g h i j k l m n o p q r[s t u v]w x y z
  a b c d e f g h i j k l m n o p q r s t u[v w x y]z
  a b c d e f g h i j k l m n o p q r s t u v w x[y z * *]

有关step参数的用例示例,请参见用python有效地处理大型.txt文件。


deque窗口的一个轻微修改版本,使其成为一个真正的滚动窗口。因此,它开始只填充一个元素,然后增长到它的最大窗口大小,然后缩小,因为它的左边缘接近结束:

from collections import deque
def window(seq, n=2):
    it = iter(seq)
    win = deque((next(it, None) for _ in xrange(1)), maxlen=n)
    yield win
    append = win.append
    for e in it:
        append(e)
        yield win
    for _ in xrange(len(win)-1):
        win.popleft()
        yield win

for wnd in window(range(5), n=3):
    print(list(wnd))

这给了

[0]
[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

>>> n, m = 6, 3
>>> k = n - m+1
>>> print ('{}\n'*(k)).format(*[range(i, i+m) for i in xrange(k)])
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

多个迭代器!

def window(seq, size, step=1):
    # initialize iterators
    iters = [iter(seq) for i in range(size)]
    # stagger iterators (without yielding)
    [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)]
    while(True):
        yield [next(i) for i in iters]
        # next line does nothing for step = 1 (skips iterations for step > 1)
        [next(i) for i in iters for j in range(step-1)]

next(it)在序列结束时引发StopIteration,出于一些我无法理解的很酷的原因,yield语句在这里除外它,函数返回,忽略没有形成完整窗口的剩余值。

无论如何,这是目前为止行数最少的解决方案,它唯一的要求是seq实现__iter__或__getitem__,并且除了@dansalmo的解决方案之外,不依赖于itertools或集合:)


def rolling_window(list, degree):
    for i in range(len(list)-degree+1):
        yield [list[i+o] for o in range(degree)]

这是一个滚动平均函数


为什么不

def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

它被记录在Python文档中。 您可以轻松地将其扩展到更宽的窗口。


如何使用以下方法:

mylist = [1, 2, 3, 4, 5, 6, 7]

def sliding_window(l, window_size=2):
    if window_size > len(l):
        raise ValueError("Window size must be smaller or equal to the number of elements in the list.")

    t = []
    for i in xrange(0, window_size):
        t.append(l[i:])

    return zip(*t)

print sliding_window(mylist, 3)

输出:

[(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)]

def GetShiftingWindows(thelist, size):
    return [ thelist[x:x+size] for x in range( len(thelist) - size + 1 ) ]

>> a = [1, 2, 3, 4, 5]
>> GetShiftingWindows(a, 3)
[ [1, 2, 3], [2, 3, 4], [3, 4, 5] ]

为了演示如何组合itertools recipe,我将使用consume recipe尽可能直接地将成对的recipe扩展回窗口recipe:

def consume(iterator, n):
    "Advance the iterator n-steps ahead. If n is none, consume entirely."
    # Use functions that consume iterators at C speed.
    if n is None:
        # feed the entire iterator into a zero-length deque
        collections.deque(iterator, maxlen=0)
    else:
        # advance to the empty slice starting at position n
        next(islice(iterator, n, n), None)

def window(iterable, n=2):
    "s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..."
    iters = tee(iterable, n)
    # Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's
    # slower for larger window sizes, while saving only small fixed "noop" cost
    for i, it in enumerate(iters):
        consume(it, i)
    return zip(*iters)

窗口的配方与成对的相同,它只是将第二个tee-ed迭代器上的单个元素“consume”替换为逐步增加n - 1个迭代器上的consume。使用consume而不是在islice中包装每个迭代器稍微快一些(对于足够大的可迭代对象),因为您只在消费阶段支付islice包装开销,而不是在提取每个窗口值的过程中(因此它以n为界,而不是iterable中项目的数量)。

在性能方面,与其他一些解决方案相比,这是相当不错的(并且比我测试的任何其他解决方案都要好)。在Python 3.5.0, Linux x86-64上测试,使用ipython %timeit magic。

Kindall是deque解决方案,通过使用islice而不是自制生成器表达式来调整性能/正确性,并测试产生的长度,因此当可迭代对象比窗口短时不会产生结果,以及通过位置传递deque的maxlen而不是通过关键字(对于较小的输入产生惊人的差异):

>>> %timeit -r5 deque(windowkindall(range(10), 3), 0)
100000 loops, best of 5: 1.87 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 3), 0)
10000 loops, best of 5: 72.6 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 30), 0)
1000 loops, best of 5: 71.6 μs per loop

与之前改编的kindall解决方案相同,但每个yield win都更改为yield tuple(win),因此存储来自生成器的结果,而不需要所有存储的结果真正成为最新结果的视图(在这种情况下,所有其他合理的解决方案都是安全的),并将tuple=tuple添加到函数定义中,以将tuple的使用从LEGB中的B移动到L:

>>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0)
100000 loops, best of 5: 3.05 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0)
10000 loops, best of 5: 207 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0)
1000 loops, best of 5: 348 μs per loop

如上所示的基于消费的解决方案:

>>> %timeit -r5 deque(windowconsume(range(10), 3), 0)
100000 loops, best of 5: 3.92 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 3), 0)
10000 loops, best of 5: 42.8 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 30), 0)
1000 loops, best of 5: 232 μs per loop

与consume相同,但是内联了consume的else case以避免函数调用,n是None测试以减少运行时间,特别是对于设置开销是工作中有意义的一部分的小输入:

>>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0)
100000 loops, best of 5: 3.57 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0)
10000 loops, best of 5: 40.9 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0)
1000 loops, best of 5: 211 μs per loop

(旁注:成对的一种变体,重复使用tee和默认参数2来创建嵌套的tee对象,因此任何给定的迭代器只前进一次,而不是独立地消耗越来越多的次数,类似于MrDrFenner的答案类似于非内联消耗,并且在所有测试中比内联消耗更慢,所以为了简洁起见,我省略了这些结果)。

As you can see, if you don't care about the possibility of the caller needing to store results, my optimized version of kindall's solution wins most of the time, except in the "large iterable, small window size case" (where inlined consume wins); it degrades quickly as the iterable size increases, while not degrading at all as the window size increases (every other solution degrades more slowly for iterable size increases, but also degrades for window size increases). It can even be adapted for the "need tuples" case by wrapping in map(tuple, ...), which runs ever so slightly slower than putting the tupling in the function, but it's trivial (takes 1-5% longer) and lets you keep the flexibility of running faster when you can tolerate repeatedly returning the same value.

如果需要安全的存储返回值,除了最小的输入大小外,内联消耗在所有输入大小上都占优势(非内联消耗略慢,但扩展相似)。基于deque & tuple的解决方案只对最小的输入获胜,因为更小的设置成本,收益也很小;当可迭代对象变长时,它会严重退化。

为了记录,我使用的kindall的解决方案的改编版本产生元组是:

def windowkindalltupled(iterable, n=2, tuple=tuple):
    it = iter(iterable)
    win = deque(islice(it, n), n)
    if len(win) < n:
        return
    append = win.append
    yield tuple(win)
    for e in it:
        append(e)
        yield tuple(win)

在函数定义行中删除缓存tuple,并在每个yield中使用tuple,以获得更快但不太安全的版本。


这是一个老问题,但是对于那些仍然感兴趣的人来说,在这个页面中有一个使用生成器的窗口滑块的伟大实现(Adrian Rosebrock)。

它是OpenCV的一个实现,但是你可以很容易地将它用于任何其他目的。对于渴望的人,我将粘贴代码在这里,但为了更好地理解它,我建议访问原始页面。

def sliding_window(image, stepSize, windowSize):
    # slide a window across the image
    for y in xrange(0, image.shape[0], stepSize):
        for x in xrange(0, image.shape[1], stepSize):
            # yield the current window
            yield (x, y, image[y:y + windowSize[1], x:x + windowSize[0]])

提示:您可以在迭代生成器时检查窗口的.shape,以丢弃那些不符合您需求的窗口

干杯


有一个库可以完全满足你的需要:

import more_itertools
list(more_itertools.windowed([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],n=3, step=3))

Out: [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12), (13, 14, 15)]

修改了DiPaolo的答案,允许任意填充和可变步长

import itertools
def window(seq, n=2,step=1,fill=None,keep=0):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(itertools.islice(it, n))    
    if len(result) == n:
        yield result
    while True:        
#         for elem in it:        
        elem = tuple( next(it, fill) for _ in range(step))
        result = result[step:] + elem        
        if elem[-1] is fill:
            if keep:
                yield result
            break
        yield result

#Importing the numpy library
import numpy as np
arr = np.arange(6) #Sequence
window_size = 3
np.lib.stride_tricks.as_strided(arr, shape= (len(arr) - window_size +1, window_size), 
strides = arr.strides*2)

"""Example output:

  [0, 1, 2]
  [1, 2, 3]
  [2, 3, 4]
  [3, 4, 5]

"""


让我们让它变懒!

from itertools import islice, tee

def window(iterable, size): 
    iterators = tee(iterable, size) 
    iterators = [islice(iterator, i, None) for i, iterator in enumerate(iterators)]  
    yield from zip(*iterators)

list(window(range(5), 3))
# [(0, 1, 2), (1, 2, 3), (2, 3, 4)]

这里有一行。我对它进行了计时,它与顶部答案的性能相当,并且随着更大的seq逐渐变得更好,len(seq) = 20时慢20%,len(seq) = 10000时慢7%

zip(*[seq[i:(len(seq) - n - 1 + i)] for i in range(n)])

尝试我的部分,简单,一行,使用islice的python方式。但是,可能不是最佳效率。

from itertools import islice
array = range(0, 10)
window_size = 4
map(lambda i: list(islice(array, i, i + window_size)), range(0, len(array) - window_size + 1))
# output = [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]]

解释: 通过使用window_size的islice创建窗口,并在所有数组上使用map迭代此操作。


除了我想到的解决方案,我还测试了一些解决方案。我发现我想出的是最快的,所以我想分享我的python3实现。

import itertools
import sys

def windowed(l, stride):
    return zip(*[itertools.islice(l, i, sys.maxsize) for i in range(stride)])

深度学习中滑动窗口数据的优化函数

def SlidingWindow(X, window_length, stride):
    indexer = np.arange(window_length)[None, :] + stride*np.arange(int(len(X)/stride)-window_length+4)[:, None]
    return X.take(indexer)

应用于多维数组

import numpy as np
def SlidingWindow(X, window_length, stride1):
    stride=  X.shape[1]*stride1
    window_length = window_length*X.shape[1]
    indexer = np.arange(window_length)[None, :] + stride1*np.arange(int(len(X)/stride1)-window_length-1)[:, None]
    return X.take(indexer)

我的两个版本的窗口实现

from typing import Sized, Iterable

def window(seq: Sized, n: int, strid: int = 1, drop_last: bool = False):
    for i in range(0, len(seq), strid):
        res = seq[i:i + n]
        if drop_last and len(res) < n:
            break
        yield res


def window2(seq: Iterable, n: int, strid: int = 1, drop_last: bool = False):
    it = iter(seq)
    result = []
    step = 0
    for i, ele in enumerate(it):
        result.append(ele)
        result = result[-n:]
        if len(result) == n:
            if step % strid == 0:
                yield result
            step += 1
    if not drop_last:
        yield result


另一种从列表生成固定长度窗口的简单方法

from collections import deque

def window(ls,window_size=3):
    window = deque(maxlen=window_size)

    for element in ls:
        
        if len(window)==window_size:
            yield list(window)
        window.append(element)

ls = [0,1,2,3,4,5]

for w in window(ls):
    print(w)

我最终使用的解决方案(保持简单):

def sliding_window(items, size):
    return [items[start:end] for start, end
            in zip(range(0, len(items) - size + 1), range(size, len(items) + 1))]

不用说,项目序列需要是可切片的。使用索引并不理想,但考虑到其他选项,这似乎是最不坏的选择……这也可以很容易地更改为生成器:只需替换[…]和……


toolz/cytoolz包有一个sliding_window函数。

>>> from cytoolz import sliding_window
>>> list(sliding_window(3, range(6))) # returns [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5)]

在Python 3.10中,我们有itertools.pairwise(iterable)函数来滑动包含两个元素的窗口:

医生是这样说的:

返回从输入可迭代对象中获取的连续重叠对。 输出迭代器中2元组的数量将比输入的数量少1。如果输入可迭代对象的值少于两个,则返回空值。 大致相当于: def成对(iterable): # pairwise('ABCDEFG')——> AB BC CD DE EF FG A, b = tee(可迭代) 下一(b,没有) 返回zip(a, b)


更新

Kelly发现这是一个重复的答案。但我在这里留下这个作为反例,因为我包含了一个毫无意义的最小值。

所以如果你想用min来避免IndexError,没有必要,range会帮你处理这种情况。


旧的答案

奇怪的是,当n > len(l)返回[]时,下面的句柄在语义上是正确的。

>>> l = [0, 1, 2, 3, 4]

>>> n = 2
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1], [1, 2], [2, 3], [3, 4]]
>>>
>>> n = 3
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>>
>>> n = 4
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2, 3], [1, 2, 3, 4]]
>>>
>>> n = 5
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> [[0, 1, 2, 3, 4]]
>>>
>>> n = 10 # n > len(l)
>>> [l[i: i + min(n, len(l)-i)] for i in range(len(l)-n+1)]
>>> []