Core components

class apf.core.step.GenericStep(consumer: ~typing.Type[~apf.consumers.generic.GenericConsumer] = <class 'apf.core.step.DefaultConsumer'>, producer: ~typing.Type[~apf.producers.generic.GenericProducer] = <class 'apf.core.step.DefaultProducer'>, metrics_sender: ~typing.Type[~apf.metrics.generic.GenericMetricsProducer] = <class 'apf.core.step.DefaultMetricsProducer'>, config: dict = {}, level: int = 20, prometheus_metrics: ~apf.metrics.prometheus.prometheus.PrometheusMetrics = <apf.metrics.prometheus.prometheus.DefaultPrometheusMetrics object>)[source]

Generic Step for apf.

Parameters
consumerGenericConsumer

An object of type GenericConsumer.

levellogging.level

Logging level, has to be a logging.LEVEL constant.

Adding LOGGING_DEBUG to settings.py set the step’s global logging level to debug.

#settings.py
LOGGING_DEBUG = True
**step_argsdict

Additional parameters for the step.

Attributes
consumer_config
metrics_config
metrics_producer_params
producer_config

Methods

execute(messages)

Execute the logic of the step.

get_extra_metrics(message)

Generate extra metrics from the EXTRA_METRICS metrics configuration.

get_value(message, params)

Get values from a massage and process it to create a new metric.

produce(result)

Produces messages using the configured producer class.

send_metrics(**metrics)

Send Metrics with a metrics producer.

set_producer_key_field(key_field)

Set the producer key, used in producer.produce(message, key=message[key_field])

start()

Start running the step.

post_execute

post_produce

pre_consume

pre_execute

pre_produce

tear_down

abstract execute(messages: List[dict]) Union[Iterable[Dict[str, Any]], Dict[str, Any]][source]

Execute the logic of the step. This method has to be implemented by the instanced class.

Parameters
messagedict, list

Dict-like message to be processed or list of dict-like messages

get_extra_metrics(message)[source]

Generate extra metrics from the EXTRA_METRICS metrics configuration.

Parameters
messagedict, list

Dict-like message to be processed or list of dict-like messages

Returns
dict

Dictionary with extra metrics from the messages.

get_value(message, params)[source]

Get values from a massage and process it to create a new metric.

Parameters
messagedict

Dict-like message to be processed

paramsstr, dict

String of the value key or dict with the following:

  • ‘key’: str

    Must have parameter, has to be in the message.

  • ‘alias’: str

    New key returned, this can be used to standarize some message keys.

  • ‘format’: callable

    Function to be call on the message value.

Returns
new_key, value

Aliased key and processed value.

produce(result: Union[Iterable[Dict[str, Any]], Dict[str, Any]])[source]

Produces messages using the configured producer class.

Parameters
result: dict | list[dict]

The result of the step’s execution. This parameter can be an iterable or a single message, where the message should be a dictionary that matches the output schema of the step.

NOTE: If you want to produce with a key, use the set_producer_key_field(key_field)
method somewhere in the lifecycle of the step prior to the produce state.
send_metrics(**metrics)[source]

Send Metrics with a metrics producer.

For this method to work the METRICS_CONFIG variable has to be set in the STEP_CONFIG variable.

Example:

Send the compute time for an object.

#example_step/step.py
self.send_metrics(compute_time=compute_time, oid=oid)

For this to work we need to declare

#settings.py
STEP_CONFIG = {...
    "METRICS_CONFIG":{ #Can be a empty dictionary
        "CLASS": "apf.metrics.KafkaMetricsProducer",
        "PARAMS": { # params for the apf.metrics.KafkaMetricsProducer
            "PARAMS":{
                ## this producer uses confluent_kafka.Producer, so here we provide
                ## arguments for that class, like bootstrap.servers
                bootstrap.servers": "kafka1:9092",
            },
            "TOPIC": "metrics_topic" # the topic to store the metrics
        },
    }
}
Parameters
**metricsdict-like

Parameters sent to the kafka topic as message.

set_producer_key_field(key_field: str)[source]

Set the producer key, used in producer.produce(message, key=message[key_field])

Parameters
key_fieldstr

the key of the message which value will be used as key for the producer

start()[source]

Start running the step.

Topic Management

class apf.core.topic_management.DailyTopicStrategy(topic_format='ztf_%s_programid1', date_format='%Y%m%d', change_hour=22, retention_days=8)[source]

Gives a KafkaConsumer a new topic every day. (For more information check the apf.consumers.KafkaConsumer)

Parameters
topic_formatstr/list

Topic format with %s for the date field.

date_formatstr
Date format code. Must be a valid 1989 C code.

i.e: %Y%m%d = YYYY-mm-dd

change_hour: int

UTC hour to change the topic.

Methods

get_topics()

Get list of topics updated to the current date.

get_topics()[source]

Get list of topics updated to the current date.

Returns
list

List of updated topics.