Source code for apf.producers.generic

from abc import ABC, abstractmethod
import logging
from typing import Union


[docs]class GenericProducer(ABC): """Generic Producer for Alert Processing Framework.""" def __init__(self, config=None): self.logger = logging.getLogger(self.__class__.__name__) self.logger.info(f"Creating {self.__class__.__name__}") self.config = config self._key_field = None @property def key_field(self): return self._key_field def set_key_field(self, key): self._key_field = key
[docs] @abstractmethod def produce(self, message=None, **kwargs): """Send a message after processing. Parameters ---------- message : dict-like Message to be sended. """ pass