我有一个由多重yield返回的生成器对象。准备调用这个生成器是相当耗时的操作。这就是为什么我想重复使用发电机几次。
y = FunctionWithYield()
for x in y: print(x)
#here must be something to reset 'y'
for x in y: print(x)
当然,我正在考虑将内容复制到简单的列表中。有办法重置我的发电机吗?
请参见:如何提前查看Python生成器中的一个元素?
我有一个由多重yield返回的生成器对象。准备调用这个生成器是相当耗时的操作。这就是为什么我想重复使用发电机几次。
y = FunctionWithYield()
for x in y: print(x)
#here must be something to reset 'y'
for x in y: print(x)
当然,我正在考虑将内容复制到简单的列表中。有办法重置我的发电机吗?
请参见:如何提前查看Python生成器中的一个元素?
当前回答
我的答案解决了稍微不同的问题:如果初始化生成器的开销很大,生成每个生成的对象的开销也很大。但是我们需要在多个函数中多次使用生成器。为了只调用一次生成器和每个生成的对象,我们可以使用线程并在不同的线程中运行每个消费方法。由于GIL,我们可能无法实现真正的并行,但我们将实现我们的目标。
这种方法在以下情况下做得很好:深度学习模型处理了大量图像。结果是图像上的很多物体都有很多遮罩。每个掩码都会消耗内存。我们有大约10种方法来进行不同的统计和度量,但它们都是一次性拍摄所有图像。所有的图像都装不下内存。方法可以很容易地重写为接受迭代器。
class GeneratorSplitter:
'''
Split a generator object into multiple generators which will be sincronised. Each call to each of the sub generators will cause only one call in the input generator. This way multiple methods on threads can iterate the input generator , and the generator will cycled only once.
'''
def __init__(self, gen):
self.gen = gen
self.consumers: List[GeneratorSplitter.InnerGen] = []
self.thread: threading.Thread = None
self.value = None
self.finished = False
self.exception = None
def GetConsumer(self):
# Returns a generator object.
cons = self.InnerGen(self)
self.consumers.append(cons)
return cons
def _Work(self):
try:
for d in self.gen:
for cons in self.consumers:
cons.consumed.wait()
cons.consumed.clear()
self.value = d
for cons in self.consumers:
cons.readyToRead.set()
for cons in self.consumers:
cons.consumed.wait()
self.finished = True
for cons in self.consumers:
cons.readyToRead.set()
except Exception as ex:
self.exception = ex
for cons in self.consumers:
cons.readyToRead.set()
def Start(self):
self.thread = threading.Thread(target=self._Work)
self.thread.start()
class InnerGen:
def __init__(self, parent: "GeneratorSplitter"):
self.parent: "GeneratorSplitter" = parent
self.readyToRead: threading.Event = threading.Event()
self.consumed: threading.Event = threading.Event()
self.consumed.set()
def __iter__(self):
return self
def __next__(self):
self.readyToRead.wait()
self.readyToRead.clear()
if self.parent.finished:
raise StopIteration()
if self.parent.exception:
raise self.parent.exception
val = self.parent.value
self.consumed.set()
return val
Ussage:
genSplitter = GeneratorSplitter(expensiveGenerator)
metrics={}
executor = ThreadPoolExecutor(max_workers=3)
f1 = executor.submit(mean,genSplitter.GetConsumer())
f2 = executor.submit(max,genSplitter.GetConsumer())
f3 = executor.submit(someFancyMetric,genSplitter.GetConsumer())
genSplitter.Start()
metrics.update(f1.result())
metrics.update(f2.result())
metrics.update(f3.result())
其他回答
您可以定义一个返回生成器的函数
def f():
def FunctionWithYield(generator_args):
code here...
return FunctionWithYield
现在你可以想做多少次就做多少次:
for x in f()(generator_args): print(x)
for x in f()(generator_args): print(x)
另一种选择是使用itertools.tee()函数创建生成器的第二个版本:
import itertools
y = FunctionWithYield()
y, y_backup = itertools.tee(y)
for x in y:
print(x)
for x in y_backup:
print(x)
从内存使用的角度来看,如果原始迭代可能不处理所有的项,这可能是有益的。
如果你的生成器在某种意义上是纯的,它的输出只依赖于传递的参数和步长,并且你希望生成的生成器是可重新启动的,这里有一个排序代码片段可能很方便:
import copy
def generator(i):
yield from range(i)
g = generator(10)
print(list(g))
print(list(g))
class GeneratorRestartHandler(object):
def __init__(self, gen_func, argv, kwargv):
self.gen_func = gen_func
self.argv = copy.copy(argv)
self.kwargv = copy.copy(kwargv)
self.local_copy = iter(self)
def __iter__(self):
return self.gen_func(*self.argv, **self.kwargv)
def __next__(self):
return next(self.local_copy)
def restartable(g_func: callable) -> callable:
def tmp(*argv, **kwargv):
return GeneratorRestartHandler(g_func, argv, kwargv)
return tmp
@restartable
def generator2(i):
yield from range(i)
g = generator2(10)
print(next(g))
print(list(g))
print(list(g))
print(next(g))
输出:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[]
0
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1
使用包装器函数来处理StopIteration
您可以为生成器生成函数编写一个简单的包装器函数,用于跟踪生成器耗尽的时间。它将使用生成器在迭代结束时抛出的StopIteration异常来完成此操作。
import types
def generator_wrapper(function=None, **kwargs):
assert function is not None, "Please supply a function"
def inner_func(function=function, **kwargs):
generator = function(**kwargs)
assert isinstance(generator, types.GeneratorType), "Invalid function"
try:
yield next(generator)
except StopIteration:
generator = function(**kwargs)
yield next(generator)
return inner_func
如上所述,当包装器函数捕获到StopIteration异常时,它只是重新初始化生成器对象(使用函数调用的另一个实例)。
然后,假设你定义了如下所示的生成器提供函数,你可以使用Python函数装饰器语法来隐式包装它:
@generator_wrapper
def generator_generating_function(**kwargs):
for item in ["a value", "another value"]
yield item
可能最简单的解决方案是将昂贵的部分包装在一个对象中,并将其传递给生成器:
data = ExpensiveSetup()
for x in FunctionWithYield(data): pass
for x in FunctionWithYield(data): pass
这样,就可以缓存昂贵的计算。
如果您可以同时将所有结果保存在RAM中,那么可以使用list()将生成器的结果物化到一个普通列表中并使用该列表。