是否有可能终止一个正在运行的线程而不设置/检查任何标志/信号/等等?
当前回答
Python版本:3.8
使用守护线程来执行我们想要的,如果我们想要终止守护线程,我们只需要让父线程退出,然后系统就会终止父线程创建的守护线程。
还支持协程和协程函数。
def main():
start_time = time.perf_counter()
t1 = ExitThread(time.sleep, (10,), debug=False)
t1.start()
time.sleep(0.5)
t1.exit()
try:
print(t1.result_future.result())
except concurrent.futures.CancelledError:
pass
end_time = time.perf_counter()
print(f"time cost {end_time - start_time:0.2f}")
下面是ExitThread源代码
import concurrent.futures
import threading
import typing
import asyncio
class _WorkItem(object):
""" concurrent\futures\thread.py
"""
def __init__(self, future, fn, args, kwargs, *, debug=None):
self._debug = debug
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if self._debug:
print("ExitThread._WorkItem run")
if not self.future.set_running_or_notify_cancel():
return
try:
coroutine = None
if asyncio.iscoroutinefunction(self.fn):
coroutine = self.fn(*self.args, **self.kwargs)
elif asyncio.iscoroutine(self.fn):
coroutine = self.fn
if coroutine is None:
result = self.fn(*self.args, **self.kwargs)
else:
result = asyncio.run(coroutine)
if self._debug:
print("_WorkItem done")
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
self = None
else:
self.future.set_result(result)
class ExitThread:
""" Like a stoppable thread
Using coroutine for target then exit before running may cause RuntimeWarning.
"""
def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
, args=(), kwargs={}, *, daemon=None, debug=None):
#
self._debug = debug
self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
, daemon=daemon)
self._child_daemon_thread = None
self.result_future = concurrent.futures.Future()
self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
self._parent_thread_exit_lock = threading.Lock()
self._parent_thread_exit_lock.acquire()
self._parent_thread_exit_lock_released = False # When done it will be True
self._started = False
self._exited = False
self.result_future.add_done_callback(self._release_parent_thread_exit_lock)
def _parent_thread_run(self):
self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
, name="ExitThread_child_daemon_thread"
, daemon=True)
self._child_daemon_thread.start()
# Block manager thread
self._parent_thread_exit_lock.acquire()
self._parent_thread_exit_lock.release()
if self._debug:
print("ExitThread._parent_thread_run exit")
def _release_parent_thread_exit_lock(self, _future):
if self._debug:
print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
if not self._parent_thread_exit_lock_released:
self._parent_thread_exit_lock_released = True
self._parent_thread_exit_lock.release()
def _child_daemon_thread_run(self):
self._workItem.run()
def start(self):
if self._debug:
print(f"ExitThread.start {self._started}")
if not self._started:
self._started = True
self._parent_thread.start()
def exit(self):
if self._debug:
print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
if self._parent_thread_exit_lock_released:
return
if not self._exited:
self._exited = True
if not self.result_future.cancel():
if self.result_future.running():
self.result_future.set_exception(concurrent.futures.CancelledError())
其他回答
可以通过在将退出线程的线程中安装trace来终止线程。请参阅所附的链接,了解一种可能的实现。
在Python中杀死一个线程
另一种方法是使用signal.pthread_kill发送一个停止信号。
from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep
def target():
for num in count():
print(num)
sleep(1)
thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)
结果
0
1
2
3
4
[14]+ Stopped
多处理。进程可以p.terminate()
如果我想杀死一个线程,但不想使用标志/锁/信号/信号量/事件/任何东西,我就把线程提升到完整的进程。对于只使用几个线程的代码,开销并没有那么糟糕。
例如,这可以方便地终止执行阻塞I/O的助手“线程”
转换很简单:在相关代码中替换所有线程。多线程线程。进程和所有队列。多处理队列。排队并将p.t terminate()所需的调用添加到想要杀死子进程p的父进程中
关于多处理,请参阅Python文档。
例子:
import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate() # sends a SIGTERM
正如其他人所提到的,规范是设置一个停止标志。对于一些轻量级的东西(没有Thread的子类化,没有全局变量),lambda回调是一个选项。(注意if stop()中的括号。)
import threading
import time
def do_work(id, stop):
print("I am thread", id)
while True:
print("I am thread {} doing something".format(id))
if stop():
print(" Exiting loop.")
break
print("Thread {}, signing off".format(id))
def main():
stop_threads = False
workers = []
for id in range(0,3):
tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
workers.append(tmp)
tmp.start()
time.sleep(3)
print('main: done sleeping; time to stop the threads.')
stop_threads = True
for worker in workers:
worker.join()
print('Finis.')
if __name__ == '__main__':
main()
将print()替换为始终刷新的pr()函数(sys.stdout.flush())可以提高shell输出的精度。
(仅在Windows/Eclipse/Python3.3上测试)
如果不杀死线程会更好。 一种方法是在线程的循环中引入一个“try”块,并在想要停止线程时抛出一个异常(例如break/return/…)这会让你停下来/一会儿/…)。 我在我的应用程序上使用了这个,它是有效的…
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 自动化invokerrequired代码模式
- 解析日期字符串并更改格式
- 使用try和。Python中的if