Consumer plugins

class apf.consumers.GenericConsumer(config=None)[source]

Generic Consumer for Alert Processing Framework.

Parameters are passed through config as a dict of params.

Methods

commit()

Post consume processing.

consume()

Get a message from a data source

commit()[source]

Post consume processing. Can be a postgresql, kafka, commit or a custom function to run after an alert is processed.

The commited value has to be stored as a class attribute in consume to be accessed. i.e.

def consume(self):
    self.message = get_message()

def commit(self):
    commit_logic(self.message)
abstract consume() Generator[Union[list, dict], None, None][source]

Get a message from a data source

Yields
dict

Dictionary like message of an alert.

class apf.consumers.KafkaConsumer(config)[source]

Consume from a Kafka Topic.

By default KafkaConsumer uses a manual commit strategy to avoid data loss on errors.

This strategy can be disabled completly adding “COMMIT”:False to the STEP_CONFIG variable in the step’s settings.py file, this can be useful for step testing because Kafka doesn’t save the messages that already were processed.

Example:

#settings.py
STEP_CONFIG = { ...
    "COMMIT": False #Disable commit
    #useful for testing/debugging.
}
Parameters
TOPICS: list

List of topics to consume.

Example:

Subscribe to a fixed list of topics:

#settings.py
CONSUMER_CONFIG = { ...
"TOPICS": ["topic1", "topic2"]
}

Using confluent_kafka syntax we can subscribe to a pattern

#settings.py
CONSUMER_CONFIG = { ...
"TOPICS": ["^topic*"]
}

More on pattern subscribe here

TOPIC_STRATEGY: dict

Parameters to configure a topic strategy instead of a fixed topic list.

The required parameters are:

  • CLASS: apf.core.topic_management.GenericTopicStrategy class to be used.

  • PARAMS: Parameters passed to CLASS object.

Example:

A topic strategy that updates on 23 hours UTC every day.

#settings.py
CONSUMER_CONFIG = { ...
 "TOPIC_STRATEGY": {
  "CLASS": "apf.core.topic_management"+\
            "DailyTopicStrategy",
  "PARAMS": {
    "topic_format": [
        "ztf_%s_programid1",
        "ztf_%s_programid3"
        ],
    "date_format": "%Y%m%d",
    "change_hour": 23,
    "retention_days": 8,
   }
  }
}
PARAMS: dict

Parameters passed to confluent_kafka.Consumer

The required parameters are:

  • bootstrap.servers: comma separated <host:port> str to brokers.

  • group.id: str with consumer group name.

Example:

Configure a Kafka Consumer to a secure Kafka Cluster

#settings.py
CONSUMER_CONFIG = { ...
    "PARAMS": {
        "bootstrap.servers": "kafka1:9093,kafka2:9093",
        "group.id": "step_group",
        'security.protocol': 'SSL',
        'ssl.ca.location': '<ca-cert path>',
        'ssl.keystore.location': '<keystore path>',
        'ssl.keystore.password': '<keystore password>'
    }
}

all supported confluent_kafka parameters can be found here

Methods

consume([num_messages, timeout])

Consumes num_messages messages from the specified topic.

consume(num_messages=1, timeout=60)[source]

Consumes num_messages messages from the specified topic.

Will return a dictionary or a list, depending on the number of messages consumed.

If num_messages > 1 then it returns list.

If num_messages = 1 then it returns dict.

Parameters
num_messages: int

Number of messages to be consumed

timeout: int

Seconds to wait when consuming messages. Raises exception if doesn’t get the messages after specified time

class apf.consumers.CSVConsumer(config)[source]

CSV Consumer.

Example:

CSV Consumer configuration example

#settings.py
CONSUMER_CONFIG = { ...
    "FILE_PATH": "csv_file_path",
    "OTHER_ARGS": {
        "index_col": "id",
        "sep": ";",
        "header": 0
    }
}
Parameters
FILE_PATH: path

CSV path location

OTHER_ARGS: dict

Parameters passed to pandas.read_csv() (reference here)

Methods

commit()

Post consume processing.

consume()

class apf.consumers.JSONConsumer(config)[source]

JSON Consumer.

Example:

JSON Consumer configuration example

#settings.py
CONSUMER_CONFIG = { ...
    "FILEPATH": "json_file_path"
}
Parameters
FILE_PATH: path

JSON path location

Methods

commit()

Post consume processing.

consume()

class apf.consumers.AVROFileConsumer(config)[source]

Consume from a AVRO Files Directory.

Example:

#settings.py
CONSUMER_CONFIG = { ...
    "DIRECTORY_PATH": "path/to/avro/directory"
}
Parameters
DIRECTORY_PATH: path

AVRO files Directory path location

Methods

commit()

Post consume processing.

consume()