Source code for quantlaw.utils.pipeline

import multiprocessing


[docs]class PipelineStep: max_number_of_processes = max(multiprocessing.cpu_count() - 2, 1) chunksize = None def __init__(self, processes=None, execute_args=[]): self.processes = processes self.execute_args = execute_args
[docs] def get_items(self) -> list: raise Exception("This function must be implemented in the subclass")
[docs] def execute_item(self, item): raise Exception("This function must be implemented in the subclass")
[docs] def execute_items(self, items): ctx = multiprocessing.get_context() processes = self.processes or self.__class__.max_number_of_processes if processes > 1: with ctx.Pool(processes) as p: results = p.starmap( self.execute_item, [(i, *self.execute_args) for i in items], self.__class__.chunksize, ) else: results = [] for item in items: results.append(self.execute_item(item, *self.execute_args)) return self.finish_execution(results)
[docs] def execute_filtered_items(self, items, filters=None, *args, **kwargs): if filters: filtered_items = [] for item in list(items): for filter_str in filters: if filter_str in item: filtered_items.append(item) break else: filtered_items = items return self.execute_items(filtered_items, *args, **kwargs)
[docs] def finish_execution(self, results): return results