Source code for apf.producers.csv

from apf.producers.generic import GenericProducer
from pandas import json_normalize


[docs]class CSVProducer(GenericProducer): """CSV File Producer. .. warning:: `CSVProducer` only works for a **single process** step, running it distributed or with multiprocessing can result on issues. Parameters ---------- FILE_PATH: :class:`str` Output CSV File Path. """ def __init__(self, config): super().__init__(config=config) def produce(self, message=None, **kwargs): """Produce Message to a CSV File. Doesn't add the header """ serialized_message = json_normalize(message) serialized_message.to_csv( self.config["FILE_PATH"], mode="a+", index=False, header=False )