Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
603 views
in Technique[技术] by (71.8m points)

multiprocessing - Python JoinableQueue call task_done in other process need twice

I have implemented a WorkerManager based on multiprocessing.Process and JoinableQueue. While I try to handle the process exceptions like timeout or un-handle exceptions after proc.join(timeout), and evaluate proc.exitcode to determine how to handle, and then call in_queue.task_done() to notify the job has done with the exception-handle logic. However it need to invoke twice. I have no idea why it should be called twice. Is there anyone could figure it out the reason here.

The whole code snippet:

# -*- coding=utf-8 -*-

import time
import threading
from queue import Empty
from multiprocessing import Event, Process, JoinableQueue, cpu_count, current_process

TIMEOUT = 3


class WorkersManager(object):

    def __init__(self, jobs, processes_num):
        self._processes_num = processes_num if processes_num else cpu_count()
        self._workers_num = processes_num
        self._in_queue, self._run_queue, self._out_queue = JoinableQueue(), JoinableQueue(), JoinableQueue()
        self._spawned_procs = []
        self._total = 0
        self._stop_event = Event()
        self._jobs_on_procs = {}

        self._wk_kwargs = dict(
            in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue,
            stop_event=self._stop_event
        )

        self._in_stream = [j for j in jobs]
        self._out_stream = []
        self._total = len(self._in_stream)

    def run(self):
        # Spawn Worker
        worker_processes = [
            WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
        ]
        self._spawned_procs = [
            Process(target=process.run, args=tuple())
            for process in worker_processes
        ]

        for p in self._spawned_procs:
            p.start()

        self._serve()

        monitor = threading.Thread(target=self._monitor, args=tuple())
        monitor.start()

        collector = threading.Thread(target=self._collect, args=tuple())
        collector.start()

        self._join_workers()
        # TODO: Terminiate threads
        monitor.join(TIMEOUT)
        collector.join(TIMEOUT)

        self._in_queue.join()
        self._out_queue.join()
        return self._out_stream

    def _join_workers(self):
        for p in self._spawned_procs:
            p.join(TIMEOUT)

            if p.is_alive():
                p.terminate()
                job = self._jobs_on_procs.get(p.name)
                print('Process TIMEOUT: {0} {1}'.format(p.name, job))
                result = {
                    "status": "failed"
                }

                self._out_queue.put(result)
                for _ in range(2):
                    # NOTE: Call task_done twice
                    # Guessing:
                    # 1st time to swtich process?
                    # 2nd time to notify task has done?
                    # TODO: figure it out why?
                    self._in_queue.task_done()
            else:
                if p.exitcode == 0:
                    print("{} exit with code:{}".format(p, p.exitcode))
                else:
                    job = self._jobs_on_procs.get(p.name)
                    if p.exitcode > 0:
                        print("{} with code:{} {}".format(p, p.exitcode, job))
                    else:
                        print("{} been killed with code:{} {}".format(p, p.exitcode, job))

                    result = {
                        "status": "failed"
                    }

                    self._out_queue.put(result)
                    for _ in range(2):
                        # NOTE: Call task_done twice
                        # Guessing:
                        # 1st time to swtich process?
                        # 2nd time to notify task has done?
                        # TODO: figure it out why?
                        self._in_queue.task_done()

    def _collect(self):
        # TODO: Spawn a collector proc
        while True:
            try:
                r = self._out_queue.get()
                self._out_stream.append(r)
                self._out_queue.task_done()

                if len(self._out_stream) >= self._total:
                    print("Total {} jobs done.".format(len(self._out_stream)))
                    self._stop_event.set()
                    break
            except Empty:
                continue

    def _serve(self):
        for job in self._in_stream:
            self._in_queue.put(job)

        for _ in range(self._workers_num):
            self._in_queue.put(None)

    def _monitor(self):
        running = 0
        while True:
            proc_name, job = self._run_queue.get()
            running += 1
            self._jobs_on_procs.update({proc_name: job})
            self._run_queue.task_done()
            if running == self._total:
                break


class WorkerProcess(object):

    def __init__(self, worker_id, in_queue, run_queue, out_queue, stop_event):
        self._worker_id = worker_id
        self._in_queue = in_queue
        self._run_queue = run_queue
        self._out_queue = out_queue
        self._stop_event = stop_event

    def run(self):
        self._work()
        print('worker - {} quit'.format(self._worker_id))

    def _work(self):
        print("worker - {0} start to work".format(self._worker_id))
        job = {}
        while not self._stop_event.is_set():
            try:
                job = self._in_queue.get(timeout=.01)
            except Empty:
                continue

            if not job:
                self._in_queue.task_done()
                break

            try:
                proc = current_process()
                self._run_queue.put((proc.name, job))
                r = self._run_job(job)
                self._out_queue.put(r)
            except Exception as err:
                print('Unhandle exception: {0}'.format(err), exc_info=True)
                result = {"status": 'failed'}
                self._out_queue.put(result)
            finally:
                self._in_queue.task_done()

    def _run_job(self, job):
        time.sleep(job)
        return {
            'status': 'succeed'
        }


def main():

    jobs = [3, 4, 5, 6, 7]
    procs_num = 3
    m = WorkersManager(jobs, procs_num)
    m.run()


if __name__ == "__main__":
    main()

And the issue code as following:

   self._out_queue.put(result)
                    for _ in range(2):
                        # ISSUE HERE !!!
                        # NOTE: Call task_done twice
                        # Guessing:
                        # 1st time to swtich process?
                        # 2nd time to notify task has done?
                        # TODO: figure it out why?
                        self._in_queue.task_done()

I need to invoke the self._in_queue.task_done() twice to notify the JoinableQueue the job has done by the exception-handle logic.

I guess whether task_done() call 1st time was to switch process context? or anything else. according to the testing. the 2nd task_done() has effect.

worker - 0 start to work
worker - 1 start to work
worker - 2 start to work

Process TIMEOUT: Process-1 5
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.

If you call task_done() once, and it will block forever and not to finish.

question from:https://stackoverflow.com/questions/65878199/python-joinablequeue-call-task-done-in-other-process-need-twice

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The problem is that you have a race condition, defined as:

A race condition arises in software when a computer program, to operate properly, depends on the sequence or timing of the program's processes or threads.

In method WorkerProcess._work, your main loop begins:

    while not self._stop_event.is_set():
        try:
            job = self._in_queue.get(timeout=.01)
        except Empty:
            continue

        if not job:
            self._in_queue.task_done()
            break

self._stop_event is being set by the _collect thread. Depending on where WorkerProcess._work is in the loop when this occurs, it can exit the loop leaving the None that has been place on the _in_queue signifying no more jobs. Clearly, this occurs twice for two processes. It could happen even for 0, 1, 2 or 3 processes.

The fix is to replace while not self._stop_event.is_set(): with while True: and to just rely on finding None on the _in_queue to signify termination. This enables you to remove those extra calls to task_done for those processes that have completed normally (you actually only needed one extra call per successfully completed process instead of the two you have).

But that is half of the problem. The other half is you have in your code:

def _join_workers(self):
    for p in self._spawned_procs:
        p.join(TIMEOUT)
        ...
            p.terminate()

Therefore, you are not allowing your workers enough time to deplete the _in_queue and thus there is the possibility of an arbitrary number of messages being left on it (in the example you have, of course, there would be just the current "job" being processed and the None sentinel for a total of 2).

But this is the problem in general with the code: it has been over-engineered. As an example, referring back to the first code snippet above. It can be further simplified to:

    while True:
        job = self._in_queue.get() # blocking get
        if not job:
            break

Moreover, there is no reason to even be using a JoinableQueue or Event instance since the use of a None sentinel placed on the _in_queue is sufficient to signify that the worker processes should terminate, especially if you are going to be prematurely terminating the workers. The simplified, working code is:

import time
import threading
from multiprocessing import Process, Queue, cpu_count, current_process

TIMEOUT = 3


class WorkersManager(object):

    def __init__(self, jobs, processes_num):
        self._processes_num = processes_num if processes_num else cpu_count()
        self._workers_num = processes_num
        self._in_queue, self._run_queue, self._out_queue = Queue(), Queue(), Queue()
        self._spawned_procs = []
        self._total = 0
        self._jobs_on_procs = {}

        self._wk_kwargs = dict(
            in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue
        )

        self._in_stream = [j for j in jobs]
        self._out_stream = []
        self._total = len(self._in_stream)

    def run(self):
        # Spawn Worker
        worker_processes = [
            WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
        ]
        self._spawned_procs = [
            Process(target=process.run, args=tuple())
            for process in worker_processes
        ]

        for p in self._spawned_procs:
            p.start()

        self._serve()

        monitor = threading.Thread(target=self._monitor, args=tuple())
        monitor.start()

        collector = threading.Thread(target=self._collect, args=tuple())
        collector.start()

        self._join_workers()
        # TODO: Terminiate threads
        monitor.join()
        collector.join()

        return self._out_stream

    def _join_workers(self):
        for p in self._spawned_procs:
            p.join(TIMEOUT)

            if p.is_alive():
                p.terminate()
                job = self._jobs_on_procs.get(p.name)
                print('Process TIMEOUT: {0} {1}'.format(p.name, job))
                result = {
                    "status": "failed"
                }

                self._out_queue.put(result)
            else:
                if p.exitcode == 0:
                    print("{} exit with code:{}".format(p, p.exitcode))
                else:
                    job = self._jobs_on_procs.get(p.name)
                    if p.exitcode > 0:
                        print("{} with code:{} {}".format(p, p.exitcode, job))
                    else:
                        print("{} been killed with code:{} {}".format(p, p.exitcode, job))

                    result = {
                        "status": "failed"
                    }

                    self._out_queue.put(result)

    def _collect(self):
        # TODO: Spawn a collector proc
        while True:
            r = self._out_queue.get()
            self._out_stream.append(r)
            if len(self._out_stream) >= self._total:
                print("Total {} jobs done.".format(len(self._out_stream)))
                break

    def _serve(self):
        for job in self._in_stream:
            self._in_queue.put(job)

        for _ in range(self._workers_num):
            self._in_queue.put(None)

    def _monitor(self):
        running = 0
        while True:
            proc_name, job = self._run_queue.get()
            running += 1
            self._jobs_on_procs.update({proc_name: job})
            if running == self._total:
                break


class WorkerProcess(object):

    def __init__(self, worker_id, in_queue, run_queue, out_queue):
        self._worker_id = worker_id
        self._in_queue = in_queue
        self._run_queue = run_queue
        self._out_queue = out_queue

    def run(self):
        self._work()
        print('worker - {} quit'.format(self._worker_id))

    def _work(self):
        print("worker - {0} start to work".format(self._worker_id))
        job = {}
        while True:
            job = self._in_queue.get()
            if not job:
                break

            try:
                proc = current_process()
                self._run_queue.put((proc.name, job))
                r = self._run_job(job)
                self._out_queue.put(r)
            except Exception as err:
                print('Unhandle exception: {0}'.format(err), exc_info=True)
                result = {"status": 'failed'}
                self._out_queue.put(result)

    def _run_job(self, job):
        time.sleep(job)
        return {
            'status': 'succeed'
        }


def main():

    jobs = [3, 4, 5, 6, 7]
    procs_num = 3
    m = WorkersManager(jobs, procs_num)
    m.run()


if __name__ == "__main__":
    main()

Prints:

worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 3
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.

You are probably aware of this, but due diligence requires that I mention that there are two excellent classes multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor for doing what you want to accomplish. See this for some comparisons.

Further Explanation

What is the point of using a JoinableQueue, which supports calls to task_done? Usually, it is so that you can be sure that all of the messages that you have placed on the queue have been taken off the queue and processed and the main process will not be terminating prematurely before that has occurred. But this could not work correctly in the code as you had it because you were giving your processes only TIMEOUT seconds to process its messages and then terminating the process if it was still alive with the possibility that messages were still left on its queue. This is what forced you to artificially issue extra calls to task_done just so your calls to join on the queues in the main process would not hang and why you had to post this question to begin with.

So there are two ways you could have proceeded differently. One way would have allowed you to continue using JoinableQueue instances and calling join on these instances to know when to terminate. But (1) you would not then be able to prematurely terminate your message processes and (2) your message processes must handle exceptions correctly so that they do not prematurely terminate without emptying their queues.

The other way is what I proposed, which is much simpler. The main process simply places on the input queue a special sentinel message, in this case None. This is just a message that cannot be mistaken for an actual message to be processed and instead signifies end of file or, in other words, a signal to the message process that there are no more messages that will be placed on the queue and it may now terminate. Thus, the main process just has to place in addition to the "real" messages to be processed on the queues, the additional sentinel message and then instead of doing a join call on the message queues (which are now only regular, non-joinable queues), it does join(TIMEOUT) on each process instance, which you will either find to be no longer alive because it has seen the sentinel and therefore you know that it has processed all of its messages or you can call terminate on the process if you are willing to leave messages on its input queue.

Of course, to be really sure that processes that terminated on their own really emptied their queue might require you to check their queues to see that they are indeed empty. But I assume that you should be able to code your processes to handle exceptions correctly, at least those that can be handled, so that they do not terminate prematurely and do something "reasonable" with every message.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...