现在我在框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用多处理,所以有一个模块级的多处理感知日志,log = multiprocessing.get_logger()。根据文档,这个日志记录器(EDIT)没有进程共享锁,所以你不会在sys. exe中弄乱东西。Stderr(或任何文件句柄),让多个进程同时写入它。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这在框架内很烦人,更不用说对框架的所有客户端了。还有我想不到的选择吗?


当前回答

QueueHandler在Python 3.2+中是原生的,并且正是这样做的。它很容易在以前的版本中复制。

Python文档有两个完整的示例:从多个进程记录到单个文件

对于那些使用Python < 3.2的人,只需将QueueHandler从https://gist.github.com/vsajip/591589复制到自己的代码中,或者导入logutils。

每个进程(包括父进程)将其日志记录放在Queue上,然后监听线程或进程(为每个进程提供了一个示例)拾取这些日志并将它们全部写入一个文件—没有损坏或乱码的风险。

其他回答

解决这个问题的唯一方法是非侵入性的:

Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped. Your controller process can then do one of the following: If using disk files: Coalesce the log files at the end of the run, sorted by timestamp If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)

下面是另一个简单的解决方案,适用于从谷歌到这里的其他人(比如我)。日志记录应该很简单!仅适用于3.2或更高版本。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()

只需将日志记录器的实例发布到某个地方。这样,其他模块和客户端就可以使用您的API来获取记录器,而不必导入multiprocessing。

下面是一个可以在Windows环境下使用的类,需要ActivePython。 您还可以继承其他日志处理程序(StreamHandler等)。

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

下面是一个演示用法的例子:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass

有一个很棒的套餐

包: https://pypi.python.org/pypi/multiprocessing-logging/

代码: https://github.com/jruere/multiprocessing-logging

安装:

pip install multiprocessing-logging

然后添加:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()