Core components¶
- class apf.core.step.GenericStep(consumer: ~typing.Type[~apf.consumers.generic.GenericConsumer] = <class 'apf.core.step.DefaultConsumer'>, producer: ~typing.Type[~apf.producers.generic.GenericProducer] = <class 'apf.core.step.DefaultProducer'>, metrics_sender: ~typing.Type[~apf.metrics.generic.GenericMetricsProducer] = <class 'apf.core.step.DefaultMetricsProducer'>, config: dict = {}, level: int = 20, prometheus_metrics: ~apf.metrics.prometheus.prometheus.PrometheusMetrics = <apf.metrics.prometheus.prometheus.DefaultPrometheusMetrics object>)[source]¶
Generic Step for apf.
- Parameters
- consumer
GenericConsumer
An object of type GenericConsumer.
- levellogging.level
Logging level, has to be a logging.LEVEL constant.
Adding LOGGING_DEBUG to settings.py set the step’s global logging level to debug.
#settings.py LOGGING_DEBUG = True
- **step_argsdict
Additional parameters for the step.
- consumer
- Attributes
- consumer_config
- metrics_config
- metrics_producer_params
- producer_config
Methods
execute
(messages)Execute the logic of the step.
get_extra_metrics
(message)Generate extra metrics from the EXTRA_METRICS metrics configuration.
get_value
(message, params)Get values from a massage and process it to create a new metric.
produce
(result)Produces messages using the configured producer class.
send_metrics
(**metrics)Send Metrics with a metrics producer.
set_producer_key_field
(key_field)Set the producer key, used in producer.produce(message, key=message[key_field])
start
()Start running the step.
post_execute
post_produce
pre_consume
pre_execute
pre_produce
tear_down
- abstract execute(messages: List[dict]) Union[Iterable[Dict[str, Any]], Dict[str, Any]] [source]¶
Execute the logic of the step. This method has to be implemented by the instanced class.
- Parameters
- messagedict, list
Dict-like message to be processed or list of dict-like messages
- get_extra_metrics(message)[source]¶
Generate extra metrics from the EXTRA_METRICS metrics configuration.
- Parameters
- messagedict, list
Dict-like message to be processed or list of dict-like messages
- Returns
- dict
Dictionary with extra metrics from the messages.
- get_value(message, params)[source]¶
Get values from a massage and process it to create a new metric.
- Parameters
- messagedict
Dict-like message to be processed
- paramsstr, dict
String of the value key or dict with the following:
- ‘key’: str
Must have parameter, has to be in the message.
- ‘alias’: str
New key returned, this can be used to standarize some message keys.
- ‘format’: callable
Function to be call on the message value.
- Returns
- new_key, value
Aliased key and processed value.
- produce(result: Union[Iterable[Dict[str, Any]], Dict[str, Any]])[source]¶
Produces messages using the configured producer class.
- Parameters
- result: dict | list[dict]
The result of the step’s execution. This parameter can be an iterable or a single message, where the message should be a dictionary that matches the output schema of the step.
- NOTE: If you want to produce with a key, use the set_producer_key_field(key_field)
- method somewhere in the lifecycle of the step prior to the produce state.
- send_metrics(**metrics)[source]¶
Send Metrics with a metrics producer.
For this method to work the METRICS_CONFIG variable has to be set in the STEP_CONFIG variable.
Example:
Send the compute time for an object.
#example_step/step.py self.send_metrics(compute_time=compute_time, oid=oid)
For this to work we need to declare
#settings.py STEP_CONFIG = {... "METRICS_CONFIG":{ #Can be a empty dictionary "CLASS": "apf.metrics.KafkaMetricsProducer", "PARAMS": { # params for the apf.metrics.KafkaMetricsProducer "PARAMS":{ ## this producer uses confluent_kafka.Producer, so here we provide ## arguments for that class, like bootstrap.servers bootstrap.servers": "kafka1:9092", }, "TOPIC": "metrics_topic" # the topic to store the metrics }, } }
- Parameters
- **metricsdict-like
Parameters sent to the kafka topic as message.
Topic Management¶
- class apf.core.topic_management.DailyTopicStrategy(topic_format='ztf_%s_programid1', date_format='%Y%m%d', change_hour=22, retention_days=8)[source]¶
Gives a KafkaConsumer a new topic every day. (For more information check the
apf.consumers.KafkaConsumer
)- Parameters
- topic_formatstr/list
Topic format with %s for the date field.
- date_formatstr
- Date format code. Must be a valid 1989 C code.
i.e: %Y%m%d = YYYY-mm-dd
- change_hour: int
UTC hour to change the topic.
Methods
Get list of topics updated to the current date.