现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。
我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?
我有一个解决方案,类似于ironhacker的,除了我使用日志。在我的一些代码中,我发现我需要在将异常传递回队列之前格式化它,因为回溯是不能pickle的:
class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
if record.exc_info:
# can't pass exc_info across processes so just format now
record.exc_text = self.formatException(record.exc_info)
record.exc_info = None
self.queue.put(record)
def formatException(self, ei):
sio = cStringIO.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()
if s[-1] == "\n":
s = s[:-1]
return s
解决这个问题的唯一方法是非侵入性的:
Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
Your controller process can then do one of the following:
If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)
对于可能需要这个的人,我为multiprocessing_logging包写了一个装饰器,它将当前进程名添加到日志中,这样就可以清楚地看到谁记录了什么。
它还运行install_mp_handler(),因此在创建池之前运行它是没有用的。
这让我可以看到哪个工作人员创建了哪些日志消息。
下面是蓝图和示例:
import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging
# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
class MultiProcessLogFilter(logging.Filter):
def filter(self, record):
try:
process_name = multiprocessing.current_process().name
except BaseException:
process_name = __name__
record.msg = f'{process_name} :: {record.msg}'
return True
multiprocessing_logging.install_mp_handler()
f = MultiProcessLogFilter()
# Wraps is needed here so apply / apply_async know the function name
@wraps(fn)
def wrapper(*args, **kwargs):
logger.removeFilter(f)
logger.addFilter(f)
return fn(*args, **kwargs)
return wrapper
# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
logger.info(f'test function called via: {argument}')
# You can also redefine undecored functions
def undecorated_function():
logger.info('I am not decorated')
@logs_mp_process_names
def redecorated(*args, **kwargs):
return undecorated_function(*args, **kwargs)
# Enjoy
if __name__ == '__main__':
with multiprocessing.Pool() as mp_pool:
# Also works with apply_async
mp_pool.apply(test, ('mp pool',))
mp_pool.apply(redecorated)
logger.info('some main logs')
test('main program')
下面是我简单的破解/变通方法…不是最全面的,但很容易修改,比我在写这篇文章之前找到的任何其他答案都更容易阅读和理解:
import logging
import multiprocessing
class FakeLogger(object):
def __init__(self, q):
self.q = q
def info(self, item):
self.q.put('INFO - {}'.format(item))
def debug(self, item):
self.q.put('DEBUG - {}'.format(item))
def critical(self, item):
self.q.put('CRITICAL - {}'.format(item))
def warning(self, item):
self.q.put('WARNING - {}'.format(item))
def some_other_func_that_gets_logger_and_logs(num):
# notice the name get's discarded
# of course you can easily add this to your FakeLogger class
local_logger = logging.getLogger('local')
local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
local_logger.debug('hmm, something may need debugging here')
return num*2
def func_to_parallelize(data_chunk):
# unpack our args
the_num, logger_q = data_chunk
# since we're now in a new process, let's monkeypatch the logging module
logging.getLogger = lambda name=None: FakeLogger(logger_q)
# now do the actual work that happens to log stuff too
new_num = some_other_func_that_gets_logger_and_logs(the_num)
return (the_num, new_num)
if __name__ == '__main__':
multiprocessing.freeze_support()
m = multiprocessing.Manager()
logger_q = m.Queue()
# we have to pass our data to be parallel-processed
# we also need to pass the Queue object so we can retrieve the logs
parallelable_data = [(1, logger_q), (2, logger_q)]
# set up a pool of processes so we can take advantage of multiple CPU cores
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
worker_output = pool.map(func_to_parallelize, parallelable_data)
pool.close() # no more tasks
pool.join() # wrap up current tasks
# get the contents of our FakeLogger object
while not logger_q.empty():
print logger_q.get()
print 'worker output contained: {}'.format(worker_output)