Python version of the MPI _ IoC idea

The python version runs on python3 and requires the following packages:

MPI from mpi4py
for all the MPI
psutil
for getting the amount of virtual memory available
socket
for getting the hostname

The code defines a class called MP_MachineInfo for transporting the details about the node on which a copy of the script runs. This includes the hostname, the virtual memory, and the rank of the process. This can be used by the Control node (program copy known as rank 0) for load-balancing or directing a copy of the program to alter its functionality. For instance, if the same hostname is returned by 16 copies of the program, it's likely that those copies are all running on the same "blade" and sharing their isolated backplane for their communications, and therefore one of those could be designated as the I/O controller for the others. I used this tactic with an SRME3D application to reduce unnecessary communications off the "blade" - the one node read and buffered data for the others on the backplane, and worked a separate part of the overall problem (that application used the C version of this algorithm).

This class also includes the amount of memory available, and this can be used by the program copies sharing this "blade" to limit the amount of memory used for the buffering of data for use by all processes sharing this hostname. When all these packets are received by the control node, it can make send memory use recommendations to all the processes with that same hostname. And when that group of program copies have finished their work, they can be repurposed to work other areas or do other functions. I also used this sort of logic on the SRME3D application: when the region provided by one input file was fully worked, the nodes would all be assigned as worker nodes getting their information and sending their results to another region file that was taking more time - on a different "blade".

The code also defines a class called MI_Shutdown which is an internal message for telling nodes to do their cleanup and stop. The user application does not need to interact with this message.

The main class is MPI_IoC and is instantiated with the optional function to invoke when the MP_MachineInfo message is received on rank 0, and with an optional message to invoke when all nodes have reported in and ready. Both arguments are optional, but without using at least one of them you'd have no way to determine when the system is ready to begin (additional messages can be registered at later times, provided they're given in the same order). And before beginning the execution in the IoC mode, the nodes register all the messages they'll be using. This is what connects the receipt of a certain type of message with a method that will process messages of that sort.

The framework only needs to supply the following methods:

The C and Fortran versions also require the Init and Finalize methods, but these are implicitly handled by Python. The required methods for the Python version of the framework are implemented in this single file: MPI_IoC.py - the framework itself (with test code).

The Python code is on the order of 120 lines for the framework and another 60 for a test demonstration. The other languages require significantly more boilerplate (but also allow for the inclusion of licensing). The test code defines a class for messages called MI_Test that is sent between the nodes. And when the test application is started, the capabilities of all nodes are received by the control node (rank 0). When all nodes have reported their capabilities, the startup method is called that broadcasts a message a few tiems and then sends a message a couple of times. While this is happening, a timer is running that will send the message to stop the nodes and return control to the main program (exits the IoC on each core - however many were started). At this point, any final cleanup could be done before the application quits.

A typical execution of this test program produces the following print (but if it runs on different cores or a different CPU, the messages may print in slightly different order):

$ mpiexec -n 2 python3 MPI_IoC.py 
[0:2]	->	TestMain::startupMethod
[0:2]	->	TestMain::capabilitiesIn 	0 0,Irigal,134752309248		msgAvailable True
[0:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable True
[1:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable True
[0:2]	->	TestMain::rxMessage 	msgAvailable[True]
[0:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[0:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable True
[1:2]	->	TestMain::rxMessage 	msgAvailable[True]
[1:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[1:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable True
[0:2]	->	TestMain::rxMessage 	msgAvailable[True]
[0:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[0:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable False
[1:2]	->	TestMain::rxMessage 	msgAvailable[True]
[1:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[1:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable True
[0:2]	->	TestMain::rxMessage 	msgAvailable[True]
[0:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[0:2]	->	TestMain::capabilitiesIn 	1 1,Irigal,134752309248		msgAvailable False
[1:2]	->	TestMain::rxMessage 	msgAvailable[True]
[1:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[1:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable True
[1:2]	->	TestMain::rxMessage 	msgAvailable[True]
[1:2]	->	TestMain::rxMessage message available - processing locally vvvvv
[1:2]	->	TestMain::rxMessage 	sourceNode 0 data 	MI_Test:: {'a': [2, 2, 3], 'b': 'Fred', 'c': 2.718281828}		msgAvailable False
[1:2]	->	TestMain::rxMessage 	msgAvailable[False]
[1:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
[1:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
[1:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
[1:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
[0:2]	->	TestMain::capabilitiesIn 		msgAvailable[False]
[0:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
[0:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
[0:2]	->	TestMain::rxMessage message available - processed locally  ^^^^^
	

The first part of each line (in this example) shows the node number and the number of nodes, followed by the class and method names followed by the text of the message.

A full application would define a number of classes that will be messages sent between the nodes (messages that request more work, supply completed work, set a node to initially perform a function like I/O buffering, define communications paths, and so on). I've found it easiest to design applications (once the IoC begins) as a state machine: when a message of one type is received while the node is in the single state, the worker processes it (which could take a significant amount of time) and sends out a response (which might be back to the same node that sent the message, or to one of the other nodes about which it knows).

Operating like this the application can be coded in a way that a node only has to respond to received messages, and this has a lot of benefits when it comes to refactoring or even altering the function of nodes during the execution. The worker nodes can also be kept busy as they can ask for additional work when they've completed with their last bit of work. It also allows the Control node (rank 0) to dynamically alter the amount of work being processed on some nodes which might be shared with other applications or on slower processors or that require additional I/O or network communication time.