现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?


当前回答

我喜欢zzzeek的回答。我只会用管道代替队列,因为如果多个线程/进程使用相同的管道端来生成日志消息,它们将被混淆。

其他回答

有一个很棒的套餐

包: https://pypi.python.org/pypi/multiprocessing-logging/

代码: https://github.com/jruere/multiprocessing-logging

安装:

pip install multiprocessing-logging

然后添加:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()

最简单的想法是:

获取当前进程的文件名和进程id。 设置一个[WatchedFileHandler][1]。这里将详细讨论此处理程序的原因,但简而言之,其他日志处理程序存在某些更糟糕的竞争条件。这个有最短的竞态条件窗口。 选择日志保存路径,例如“/var/log/…”

下面是另一个简单的解决方案,适用于从谷歌到这里的其他人(比如我)。日志记录应该很简单!仅适用于3.2或更高版本。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()

其中一个替代方案是将多处理日志写入一个已知文件,并注册一个atexit处理程序来加入这些进程,并在stderr上读取它;但是,您无法通过这种方式获得stderr上输出消息的实时流。

我有一个解决方案,类似于ironhacker的,除了我使用日志。在我的一些代码中,我发现我需要在将异常传递回队列之前格式化它,因为回溯是不能pickle的:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s