Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
595 views
in Technique[技术] by (71.8m points)

Python multiprocessing pool apply async to not return a AsyncioResult

I am seeing my main process memory increase to system limit while using a multiprocessing pool and apply_async. The main process is in charge of getting data points from a queue, putting them into a list and once the list size is greater than the batch size, use apply_async to start a new process which processes and uploads them to a TSDB. I see that the queue size in the main process is always getting cleared, but I see the pool's cache increasing if the uploads take longer.

 with Pool(processes=args.worker_pool_size) as worker_pool:
            data_queue: Queue = Queue()
            #Do some stuff
            batch_list: List[Tuple[str, str, Optional[str], Optional[str], str]] = []
            while all([client.is_alive() for client in client_conns]):
                try:
                    logger.info(data_queue.qsize()) #Increases but always drains
                    #worker_pool._cache = {} Tried to manually clear the pool cache as it was getting filled up by the AsyncResults
                    data: Tuple[str, str, Optional[str], Optional[str], str] = data_queue.get(timeout=1)
                    if data is not None:
                        batch_list.append(data)
                        if len(batch_list) >= args.batch_size:
                            result = worker_pool.apply_async(process_and_upload_data,
                                                             (deepcopy(batch_list), log_name, output))
                            del result
                            batch_list.clear()
                except Empty:
                    if len(batch_list) != 0:
                        result = worker_pool.apply_async(process_and_upload_data,
                                                         (deepcopy(batch_list), log_name, output))
                        del result
                        batch_list.clear()
                

Is there a way to have it so apply_async doesn't return a Result object and still be non-blocking? I see the issue in the cache of the pool, but when I manually clear it by setting it to {} I still see memory increasing. I have gc enabled and all other processes are working without any issues.

Thanks


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...