I am seeing my main process memory increase to system limit while using a multiprocessing pool and apply_async
. The main process is in charge of getting data points from a queue, putting them into a list and once the list size is greater than the batch size, use apply_async
to start a new process which processes and uploads them to a TSDB. I see that the queue size in the main process is always getting cleared, but I see the pool's cache increasing if the uploads take longer.
with Pool(processes=args.worker_pool_size) as worker_pool:
data_queue: Queue = Queue()
#Do some stuff
batch_list: List[Tuple[str, str, Optional[str], Optional[str], str]] = []
while all([client.is_alive() for client in client_conns]):
try:
logger.info(data_queue.qsize()) #Increases but always drains
#worker_pool._cache = {} Tried to manually clear the pool cache as it was getting filled up by the AsyncResults
data: Tuple[str, str, Optional[str], Optional[str], str] = data_queue.get(timeout=1)
if data is not None:
batch_list.append(data)
if len(batch_list) >= args.batch_size:
result = worker_pool.apply_async(process_and_upload_data,
(deepcopy(batch_list), log_name, output))
del result
batch_list.clear()
except Empty:
if len(batch_list) != 0:
result = worker_pool.apply_async(process_and_upload_data,
(deepcopy(batch_list), log_name, output))
del result
batch_list.clear()
Is there a way to have it so apply_async doesn't return a Result object and still be non-blocking? I see the issue in the cache of the pool, but when I manually clear it by setting it to {}
I still see memory increasing. I have gc enabled and all other processes are working without any issues.
Thanks
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…