Friday, July 11, 2014

Python, Multiprocessing, and Queues

Here's a little bit on what I've been working on lately, using Python's multiprocessing package to use the multiple cores on my machine but also using queues (in the multiprocessing package) to gather results from the processes. I'm writing it up because I found some info on queues that wasn't particularly helpful and in one case was just wrong.

So, basics first:

import multiprocessing

Ok good, got that step out of the way. Some things you will want:

num_cores = multiprocessing.cpu_count()

That gives you an integer of how many cores you have on your machine. If you have a newer machine where the cores are so awesome that they virtualize themselves into two cores each, it will return the virtual number (so, on my 6-core [hardware] Mac Pro it returns 12 [virtual!], which is twice as awesome as 6).

The multiprocessing seems a little weird at first. Here's some code:

proc_list = list()
    # This list is so you can "join" them. Important!
my_queue = multiprocessing.Queue()
   # Queues do q.put() and q.get() and other things. 

for a_range in chunk_ranges: 
      # The number of chunks in my code equals the number of processes.
      # "chunk_ranges" is a list of lists, [[index_a, index_b], ...]
      # indexes in a Pandas DF I am parsing. Easy to parallelize.
      # Your for loop will differ, depending on your data.
  a_proc = multiprocessing.Process(target=your_worker_function,
    args=(a_range, the_data, my_queue))
      # To your worker function, pass the args -- here it's the data range,
      # a pointer to that data, and the queue object. 
      # So you have a list of them and can iterate through the list
      # for join to end them nicely. 
  a_proc.start() # Starts one! 

# Waits for them to end and ends them nicely, cleanup. 
for p in proc_list: 
  print '%s.exitcode = %s' % (, p.exitcode)
    # You don't need this but it's nice to see them end
    # and their exit status number. 

print 'Getting elements from the queue.' 
while not my_queue.empty(): 
  one_procs_data = my_queue.get() 
  # then do something with that, maybe add it to a list, depends on what it is.

Ok so that's commented, but how does it work? How does the worker function ("worker" is the term of art, it appears, for the function that is multi-processed) deal with the queue object? Let's look at code for that. Note that this is a somewhat simple example with one queue, I've seen nice examples with an input queue and an output queue. This example here deals only with an output queue (because my project is chunking a Pandas DF into processable pieces, that is playing the role of the input queue essentially).

def your_worker_function(your_args, your_queue): 
  # Do your processing here! 
  # That's it, no "return" needed! 
# End of your_worker_function 

Not bad! Hand the worker function the queue object like you would any function's argument. You don't need to use the "return" call, since you use ".put" on the queue object and put the data you want into it. What is nice is that Python takes care of the worker functions all putting their data into the queue so you don't have to worry about them all getting smashed up (not a technical term) and your code barfing when/if they all try to access the object at the same time. No worries! Love it.

So how does that previous code work, the code that calls the worker function?

Declare a list object to populate with the processes (their names, I think). This is important, and is so you can end them all nicely and do cleanup. Also declare your queue object.

You use a looping function (here a for loop) to give out the jobs to the right number of processes. "target" is the worker function, and then "args" are the arguments you hand to that function, just as you would normally. Then, add the process that was just made to the list and start it.

The "join" for loop -- I have no idea why it is called "join" btw -- nicely cleans up after all the processes. I'm not discussing errors and such here, that's more advanced. The for loop does indeed loop through them all, and will get to all of them, waiting for them to finish in order (I think). I was a little curious about what if the first process in the list fails or doesn't stop or something, but somehow the looping will get to and join all the ones that have finished, even if they are after an infinitely looping one earlier in the list (yes I speak from experience oops).

Then, you can call your now-populated queue object and ".get" the items from it (whatever data type you put in). Deal with them appropriately -- so, maybe you made lists, and you don't want a list of lists when you're done, you want one list, so ".extend" to the outer main list object. Whatever is appropriate for your job.

There you have it. If you have 12 cores you could make it go to 11 instead of 12, but it's more fun (or at least faster) to say that you could and use 12 anyways.

Edit: Apparently, queue objects have a size limit, and things can grind to a halt and your processes will stop but won't complete (that is, your CPU won't be doing anything but your processes won't join) if you overload your queue. Oddly this doesn't crash, perhaps because exceptions don't get pickled and passed around the levels of multiprocessing. That was a very detail thin explanation. Suffice to say, I have some code that is fine but grinds to a halt even though the worker processes exit, however, the processes don't join. Right now I'm looking at the queue as the culprit.

Edit 2: I took out the my_queue.put(item) call in the worker process and replaced it with a file write (using the process name for unique filenames) and.... It worked! Actually first I took out the my_queue.put(item) and then they all joined, so, not happy with the queue. If you are doing multiprocessing, you might have a lot of data, but I guess the queue can't handle it. And, worse, in OSX you can't get the size of the queue since something isn't implemented, and also queue.full() (or whatever exactly) isn't totally accurate.