Source code for precept.events._dispatcher
import collections
from ._event import Event
[docs]class EventDispatcher:
"""Dispatch events to subscribers functions."""
[docs] def __init__(self):
self._subscribers = collections.defaultdict(list)
[docs] def subscribe(self, event: str, func):
"""
Subscribe func to execute every time event is dispatched.
:param event: The event to subscribe to.
:param func: The func to call when event is dispathed.
:return:
"""
self._subscribers[event].append(func)
[docs] async def dispatch(self, event: str, **payload):
"""
Dispatch an event with optional payload data.
:param event: Name of the event.
:param payload: Data of the event.
:return:
"""
action = Event(event, payload=payload)
for subscriber in self._subscribers[event]:
await subscriber(action)
if action.stop: # pragma: no cover
break
action.num += 1