1. Collaborative Learning

1.1. Background

This demo shows a Collaborative Learning Scenario and the AML-IP nodes involved: Model Manager Receiver Node and Model Manager Sender Node. With these 2 nodes implemented, the user can deploy as many nodes of each kind as desired and check the behavior of a simulated AML-IP network running. They are implemented in Python to prove the communication between the 2 implementations.

The purpose of the demo is to show how a Sender and a Receiver node can communicate. The Receiver node awaits model statistics from the Sender. Since the Sender doesn’t have a real AML Engine, it sends the model statistics as a string. Upon receiving the statistics, the Receiver sends a model request, also as a string since it doesn’t have an AML Engine. Then, the Sender converts the received model request to uppercase and sends it back as a model reply.

1.2. Prerequisites

Before running this demo, ensure that AML-IP is correctly installed using one of the following installation methods:

1.3. Building the demo

If the demo package is not compiled, please refer to Build demos or run the command below.

colcon build --packages-up-to amlip_demo_nodes

Once AML-IP packages are installed and built, import the libraries using the following command.

source install/setup.bash

1.4. Explaining the demo

In this section, we will delve into the details of the demo and how it works.

1.4.1. Model Manager Receiver Node

This is the Python code for the Model Manager Receiver Node application. It does not use real AML Models, but strings. It is implemented in Python using amlip_py API.

This code can be found here.

The next block includes the Python header files that allow the use of the AML-IP Python API.

from amlip_py.node.ModelManagerReceiverNode import ModelManagerReceiverNode, ModelListener
from amlip_py.types.AmlipIdDataType import AmlipIdDataType
from amlip_py.types.ModelReplyDataType import ModelReplyDataType
from amlip_py.types.ModelRequestDataType import ModelRequestDataType
from amlip_py.types.ModelStatisticsDataType import ModelStatisticsDataType

Let’s continue explaining the global variables.

DOMAIN_ID variable isolates the execution within a specific domain. Nodes with the same domain ID can communicate with each other.

DOMAIN_ID = 166

waiter is a WaitHandler that waits on a boolean value. Whenever this value is True, threads awake. Whenever it is False, threads wait.

waiter = BooleanWaitHandler(True, False)

The CustomModelListener class listens to Model Statistics Data Type and Model Reply Data Type messages received from a Model Manager Sender Node. This class is supposed to be implemented by the user in order to process the messages received from other nodes in the network.

class CustomModelListener(ModelListener):

    def statistics_received(
            self,
            statistics: ModelStatisticsDataType):

        print(f'Statistics received: {statistics.to_string()}')

        # Store the server id of the statistics
        self.server_id = statistics.server_id()

        waiter.open()

    def model_received(
            self,
            model: ModelReplyDataType) -> bool:

        print(f'Model reply received from server\n'
              f' solution: {model.to_string()}')

        return True

The main function orchestrates the execution of the Model Manager Receiver node. It creates an instance of the ModelManagerReceiverNode and starts its execution with the specified listener.

def main():
    """Execute main routine."""

    # Create request
    data = ModelRequestDataType('MobileNet V1')

    id = AmlipIdDataType('ModelManagerReceiver')
    id.set_id([15, 25, 35, 45])

    # Create node
    print('Starting Manual Test Model Manager Receiver Node Py execution. Creating Node...')
    model_receiver_node = ModelManagerReceiverNode(
        id=id,
        data=data,
        domain=DOMAIN_ID)

    print(f'Node created: {model_receiver_node.get_id()}. '
          'Already processing models.')

    model_receiver_node.start(
        listener=CustomModelListener())

After starting the node, it waits for statistics to arrive from the Model Manager Sender Node.

    # Wait statistics
    waiter.wait()

Then, it requests a model from the Model Manager Sender Node using the received server ID.

    # Request model
    model_receiver_node.request_model(model_receiver_node.listener_.server_id)

Finally, the node stops.

    model_receiver_node.stop()

1.4.2. Model Manager Sender Node

This is the Python code for the Model Manager Sender Node application. It does not use real AML Models, but strings. It does not have a real AML Engine but instead the calculation is an upper-case conversion of the string received. It is implemented in Python using amlip_py API.

This code can be found here.

The following block includes the Python header files necessary for using the AML-IP Python API.

from amlip_py.node.ModelManagerSenderNode import ModelManagerSenderNode, ModelReplier
from amlip_py.types.AmlipIdDataType import AmlipIdDataType
from amlip_py.types.ModelReplyDataType import ModelReplyDataType
from amlip_py.types.ModelRequestDataType import ModelRequestDataType

Let’s continue explaining the global variables.

DOMAIN_ID isolates the execution within a specific domain. Nodes with the same domain ID can communicate with each other.

DOMAIN_ID = 166

waiter is a WaitHandler that waits on a boolean value. Whenever this value is True, threads awake. Whenever it is False, threads wait.

waiter = BooleanWaitHandler(True, False)

The CustomModelReplier class listens to Model Request Data Type request messages received from a Model Manager Receiver Node. This class is supposed to be implemented by the user in order to process the messages.

class CustomModelReplier(ModelReplier):

    def fetch_model(
            self,
            request: ModelRequestDataType) -> ModelReplyDataType:

        reply = ModelReplyDataType(request.to_string().upper())

        print(f'Model request received from client\n'
              f' request: {request.to_string()}\n'
              f' reply: {reply.to_string()}')

        waiter.open()

        return reply

The main function orchestrates the execution of the Model Manager Sender node. It creates an instance of ModelManagerSenderNode.

def main():
    """Execute main routine."""

    id = AmlipIdDataType('ModelManagerSender')
    id.set_id([10, 20, 30, 40])

    # Create node
    print('Starting Manual Test Model Manager Sender Node Py execution. Creating Node...')
    model_sender_node = ModelManagerSenderNode(
        id=id,
        domain=DOMAIN_ID)

After starting the node, it publishes statistics using the publish_statistics() function, which fills a Model Statistics Data Type and publishes it.

    model_sender_node.publish_statistics(
        'ModelManagerSenderStatistics',
        'hello world')

Then we start the node execution, passing the previously defined CustomModelReplier() class, which is responsible for managing the request received.

    model_sender_node.start(
        listener=CustomModelReplier())

Waits for the response model to be sent to the Model Manager Receiver Node.

    # Wait for the solution to be sent
    waiter.wait()

Finally, it stops and closes the node.

    model_sender_node.stop()

1.5. Running the demo

This demo runs the implemented nodes in amlip_demo_nodes/amlip_collaborative_learning_demo.

1.5.1. Run Model Manager Receiver Node

Run the following command:

# Source colcon installation
source install/setup.bash

# To execute Model Manager Receiver Node
cd ~/AML-IP-ws/src/AML-IP/amlip_demo_nodes/amlip_collaborative_learning_demo/amlip_collaborative_learning_demo
python3 model_receiver_custom.py

The expected output is the following:

Starting Manual Test Model Manager Receiver Node Py execution. Creating Node...
Node created: ModelManagerReceiver.0f.19.23.2d. Already processing models.
Model reply received from server
solution: MOBILENET V1
Finishing Manual Test Model Manager Receiver Node Py execution.

1.5.2. Run Model Manager Sender Node

Run the following command to answer before closing:

# Source colcon installation
source install/setup.bash

# To execute Model Manager Sender Node
cd ~/AML-IP-ws/src/AML-IP/amlip_demo_nodes/amlip_collaborative_learning_demo/amlip_collaborative_learning_demo
python3 model_sender_custom.py

This execution expects an output similar to the one shown below:

Starting Manual Test Model Manager Sender Node Py execution. Creating Node...
Node created: ModelManagerSender.0a.14.1e.28. Already processing models.
Model request received from client
model: MobileNet V1
solution: MOBILENET V1
Finishing Manual Test Model Manager Sender Node Py execution.