我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。

我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

当前回答

我知道我在这里有点晚了,但我有一个非常类似的问题,但它包括使用tkinter作为GUI,并且主循环使它不可能使用依赖于.join()的任何解决方案。因此,我调整了原问题EDIT中给出的解决方案,但使其更一般,以便于其他人更容易理解。

下面是运行中的新线程类:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

当然,您总是可以让它以日志以外的其他方式处理异常,例如将其打印出来,或将其输出到控制台。

这允许您像使用Thread类一样使用ExceptionThread类,无需任何特殊修改。

其他回答

我做的是,简单的覆盖连接和运行线程的方法:

class RaisingThread(threading.Thread):
  def run(self):
    self._exc = None
    try:
      super().run()
    except Exception as e:
      self._exc = e

  def join(self, timeout=None):
    super().join(timeout=timeout)
    if self._exc:
      raise self._exc

用途如下:

def foo():
  time.sleep(2)
  print('hi, from foo!')
  raise Exception('exception from foo')    

t = RaisingThread(target=foo)
t.start()
try:
  t.join()
except Exception as e:
  print(e)

结果:

hi, from foo!
exception from foo!

我使用这个版本,它是最小的,它工作得很好。

class SafeThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        super(SafeThread, self).__init__(*args, **kwargs)
        self.exception = None

    def run(self) -> None:
        try:
            super(SafeThread, self).run()
        except Exception as ex:
            self.exception = ex
            traceback.print_exc()

    def join(self, *args, **kwargs) -> None:
        super(SafeThread, self).join(*args, **kwargs)
        if self.exception:
            raise self.exception

要使用它,只需替换线程。带安全线程的线程

t = SafeThread(target = some_function, args = (some, args,))
t.start()
# do something else here if you want as the thread runs in the background
t.join()

这个问题有很多非常奇怪复杂的答案。我是不是把它简化了,因为对我来说,这似乎对大多数事情都足够了。

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self, timeout=None):
        super(PropagatingThread, self).join(timeout)
        if self.exc:
            raise self.exc
        return self.ret

如果你确定你只会在一个或另一个版本的Python上运行,你可以将run()方法减少到只有被破坏的版本(如果你只在3之前的Python版本上运行),或者仅仅是干净的版本(如果你只在3开始的Python版本上运行)。

使用示例:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

当您加入时,您将看到在另一个线程上引发异常。

如果您正在使用six或仅在Python 3上使用,则可以改进重新引发异常时获得的堆栈跟踪信息。您可以将内部异常包装在一个新的外部异常中,而不是仅在连接点处使用堆栈,并使用

six.raise_from(RuntimeError('Exception in thread'),self.exc)

or

raise RuntimeError('Exception in thread') from self.exc

并发。Futures模块使得在单独的线程(或进程)中工作并处理任何由此产生的异常变得简单:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

并发。futures包含在Python 3.2中,并可作为早期版本的反向移植futures模块使用。

这是一个棘手的小问题,我想提出我的解决方案。我发现了一些其他的解决方案(异步。例如IO)看起来很有前途,但也呈现出一些黑盒子。队列/事件循环方法将您与某个实现联系在一起。然而,并发期货的源代码只有大约1000行,很容易理解。它让我很容易地解决了我的问题:创建临时的工作线程,而不需要太多的设置,并且能够在主线程中捕获异常。

我的解决方案使用并发期货API和线程API。它允许你创建一个worker,给你线程和未来。这样,你就可以加入线程来等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...或者你可以让worker在完成时发送一个回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...或者你可以循环直到事件完成:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

代码如下:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

...和测试函数:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')