MPI is a standard for communication between multiple processes in parallel computing. These processes can be running on different cores, CPUs, or entirely different computers in a grid. MPI is a standard, and there are many implementations available (many are open source). The features of MPI can be accessed from Python with the packages pypar and mpi4py. Here I present a Python script that implements a “process pool” design pattern using pypar. I have observed a pattern often enough in my own work that I wrote this framework to avoid reinventing the wheel every time I come across it.
This pattern is useful for any embarrassingly parallel problem. This describes a computing task that can be easily accelerated by running multiple parallel processes that do not not need to interact with one another. For example, I have large scientific data sets from several runs of an experiment that need to be analyzed. Since the data from each run can be analyzed independently from the other runs, I can analyze all the data sets at once on a parallel machine. The code below implements a “master-worker” paradigm that requires at least three processes to accelerate the calculation. The first process be comes the master, which does no calculation but hands out tasks to the workers. The rest of the processes are workers, which receive a chunk of work, finish it, return the result to the master process, and then wait for more work.
#!/usr/bin/env python from numpy import * import pypar import time # Constants MASTER_PROCESS = 0 WORK_TAG = 1 DIE_TAG = 2 MPI_myID = pypar.rank() ### Master Process ### if MPI_myID == MASTER_PROCESS: num_processors = pypar.size() print "Master process found " + str(num_processors) + " worker processors." # Create a list of dummy arrays to pass to the worker processes work_size = 10 work_array = range(0,work_size) for i in range(len(work_array)): work_array[i] = arange(0.0, 10.0) # Dispatch jobs to worker processes work_index = 0 num_completed = 0 # Start all worker processes for i in range(1, min(num_processors, work_size)): pypar.send(work_index, i, tag=WORK_TAG) pypar.send(work_array[work_index], i) print "Sent work index " + str(work_index) + " to processor " + str(i) work_index += 1 # Receive results from each worker, and send it new data for i in range(num_processors, work_size): results, status = pypar.receive(source=pypar.any_source, tag=pypar.any_tag, return_status=True) index = status.tag proc = status.source num_completed += 1 work_index += 1 pypar.send(work_index, proc, tag=WORK_TAG) pypar.send(work_array[work_index], proc) print "Sent work index " + str(work_index) + " to processor " + str(proc) # Get results from remaining worker processes while num_completed < work_size-1: results, status = pypar.receive(source=pypar.any_source, tag=pypar.any_tag, return_status=True) num_completed += 1 # Shut down worker processes for proc in range(1, num_processors): print "Stopping worker process " + str(proc) pypar.send(-1, proc, tag=DIE_TAG) else: ### Worker Processes ### continue_working = True while continue_working: work_index, status = pypar.receive(source=MASTER_PROCESS, tag=pypar.any_tag, return_status=True) if status.tag == DIE_TAG: continue_working = False else: work_array, status = pypar.receive(source=MASTER_PROCESS, tag=pypar.any_tag, return_status=True) work_index = status.tag # Code below simulates a task running time.sleep(random.random_integers(low=0, high=5)) result_array = work_array.copy() pypar.send(result_array, destination=MASTER_PROCESS, tag=work_index) #### while #### if worker pypar.finalize()