我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
当前回答
你可以试试公交模块。
这个库使基于消息的系统的实现更容易。它支持命令(单个处理程序)和事件(0或多个处理程序)方法。Buslane使用Python类型注释来正确注册处理程序。
简单的例子:
from dataclasses import dataclass
from buslane.commands import Command, CommandHandler, CommandBus
@dataclass(frozen=True)
class RegisterUserCommand(Command):
email: str
password: str
class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):
def handle(self, command: RegisterUserCommand) -> None:
assert command == RegisterUserCommand(
email='john@lennon.com',
password='secret',
)
command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
email='john@lennon.com',
password='secret',
))
安装巴士道,只需使用pip:
$ pip install buslane
其他回答
我对Longpoke的极简主义方法做了一个变化,也确保了呼叫者和呼叫者的签名:
class EventHook(object):
'''
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
>>> event = EventHook(id=int, data=dict)
>>> event += lambda id, data: print("%d %s" % (id, data))
>>> event(id=5, data={"foo": "bar"})
5 {'foo': 'bar'}
>>> event = EventHook(id=int)
>>> event += lambda wrong_name: None
Traceback (most recent call last):
...
ValueError: Listener must have these arguments: (id=int)
>>> event = EventHook(id=int)
>>> event += lambda id: None
>>> event(wrong_name=0)
Traceback (most recent call last):
...
ValueError: This EventHook must be called with these arguments: (id=int)
'''
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
def __isub__(self, handler):
self._handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str())
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()
你可以试试公交模块。
这个库使基于消息的系统的实现更容易。它支持命令(单个处理程序)和事件(0或多个处理程序)方法。Buslane使用Python类型注释来正确注册处理程序。
简单的例子:
from dataclasses import dataclass
from buslane.commands import Command, CommandHandler, CommandBus
@dataclass(frozen=True)
class RegisterUserCommand(Command):
email: str
password: str
class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):
def handle(self, command: RegisterUserCommand) -> None:
assert command == RegisterUserCommand(
email='john@lennon.com',
password='secret',
)
command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
email='john@lennon.com',
password='secret',
))
安装巴士道,只需使用pip:
$ pip install buslane
这里是一个最小的设计,应该工作得很好。你要做的就是简单地在一个类中继承Observer,然后使用observe(event_name, callback_fn)来监听特定的事件。当特定事件在代码中的任何地方被触发时(例如。事件('USB connected')),则将触发相应的回调。
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)
例子:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived', room.someone_arrived)
# Fire some events
Event('someone left', 'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted', 'Lenard')
前段时间我写了一个库,可能对你有用。 它允许您拥有本地和全局侦听器、多种不同的注册方式、执行优先级等等。
from pyeventdispatcher import register
register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)
dispatch(Event("foo.bar", {"id": 1}))
# first second
看看调度员
你可以看看pymitter (pypi)。它是一个小的单文件(~250 loc)方法 提供名称空间、通配符和TTL。
这里有一个基本的例子:
from pymitter import EventEmitter
ee = EventEmitter()
# decorator usage
@ee.on("myevent")
def handler1(arg):
print "handler1 called with", arg
# callback usage
def handler2(arg):
print "handler2 called with", arg
ee.on("myotherevent", handler2)
# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"
ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"