Source code for apf.metrics.kafka

from apf.metrics import GenericMetricsProducer
from apf.metrics import DateTimeEncoder
from confluent_kafka import Producer
from apf.core import get_class

import json


[docs]class KafkaMetricsProducer(GenericMetricsProducer): """Write metrics in a Kafka Topic. Useful for high-throughput distributed metrics, a complex architecture can be build using Apache Kafka as a queue and writing the metrics inside a time series data store, for example Prometheus, InfluxDB or Elasticsearch. Parameters ---------- config : dict. Parameters passed to the producer. - PARAMS: Parameters passed to :class:`apf.producer.KafkaProducer`. - TOPIC: List of topics to produce, for example ['metrics']. producer : apf.producers.GenericProducer An apf producer, by default is :class:`apf.producer.KafkaProducer`. """ def __init__(self, config, producer=None): super().__init__(config) self.config = config if producer is not None: self.producer = producer else: self.producer = Producer(self.config["PARAMS"]) self.time_encoder = self.config.get("TIME_ENCODER_CLASS", DateTimeEncoder) self.dynamic_topic = False if self.config.get("TOPIC"): self.logger.info(f'Producing metrics to {self.config["TOPIC"]}') self.topic = [self.config["TOPIC"]] elif self.config.get("TOPIC_STRATEGY"): self.dynamic_topic = True TopicStrategy = get_class(self.config["TOPIC_STRATEGY"]["CLASS"]) self.topic_strategy = TopicStrategy( **self.config["TOPIC_STRATEGY"]["PARAMS"] ) self.topic = self.topic_strategy.get_topics() self.logger.info(f'Using {self.config["TOPIC_STRATEGY"]}') self.logger.info(f"Producing to {self.topic}") def send_metrics(self, metrics): metrics = json.dumps(metrics, cls=self.time_encoder).encode("utf-8") if self.config.get("TOPIC_STRATEGY"): self.topic = self.topic_strategy.get_topics() for topic in self.topic: try: self.producer.produce(topic, metrics) except BufferError as e: self.logger.info(f"Error producing metrics: {e}") self.logger.info("Calling poll to empty queue and producing again") self.producer.poll(1) self.producer.produce(topic, metrics) def __del__(self): self.logger.info("Waiting to produce last messages") self.producer.flush()