Thursday, May 5, 2011

How do I handle exceptions when using threading and Queue?

If I have a program that uses threading and Queue, how do I get exceptions to stop execution? Here is an example program, which is not possible to stop with ctrl-c (basically ripped from the python docs).

from threading import Thread
from Queue import Queue
from time import sleep

def do_work(item):
    sleep(0.5)
    print "working" , item

def worker():
        while True:
            item = q.get()
            do_work(item)
            q.task_done()

q = Queue()

num_worker_threads = 10

for i in range(num_worker_threads):
     t = Thread(target=worker)
    # t.setDaemon(True)
     t.start()

for item in range(1, 10000):
    q.put(item)

q.join()       # block until all tasks are done
From stackoverflow
  • The simplest way is to start all the worker threads as daemon threads, then just have your main loop be

    while True:
        sleep(1)
    

    Hitting Ctrl+C will throw an exception in your main thread, and all of the daemon threads will exit when the interpreter exits. This assumes you don't want to perform cleanup in all of those threads before they exit.

    A more complex way is to have a global keep_going variable:

    keep_going = True
    def worker():
        while keep_going:
            try:
                item = q.get(timeout=1)
                do_work(item)
            except Empty:      # import this from the Queue module
                pas
    

    Then your main loop can set keep_going to False when it gets a KeyboardInterrupt

    try:
        while True:
            sleep(1)
    except:
        keep_going = False
    

    This lets your worker threads finish what they're doing you want instead of just having every worker thread be a daemon and exit in the middle of execution. You can also do whatever cleanup you want.

    Note that this example doesn't make use of q.join() - this makes things more complex, though you can still use it. If you do then your best bet is to use signal handlers instead of exceptions to detect KeyboardInterrupts. For example:

    from signal import signal, SIGINT
    def stop(signum, frame):
        global keep_going
        keep_going = False
    signal(SIGINT, stop)
    

    This lets you define what happens when you hit Ctrl+C without affecting whatever your main loop is in the middle of. So you can keep doing q.join() without worrying about being interrupted by a Ctrl+C. Of course, with my above examples, you don't need to be joining, but you might have some other reason for doing so.

    Joakim Lundborg : So, basically, using q.join() makes it hard to handle exceptions in the threads?
    Ber : Should'nt that read "signal(SIGINT, stop)" ?
    Eli Courtwright : It makes things more complex, but I've added an example with signals to show you how you'd use q.join() if you had a good reason to be using that.
    Eli Courtwright : @Ber: good call; I've fixed that typo.
  • A (possibly) offtopic note:

    (...)
    for item in range(1, 10000):
        q.put(item)
    (...)
    

    You could use xrange instead of range here (unless You use python3000). You will save some cpu and memory by doing so. More on xrange can be found here.

0 comments:

Post a Comment