..
    Licensed under the MIT License.
    For details on the licensing terms, see the LICENSE file.
    SPDX-License-Identifier: MIT

    Copyright 2023-2024 (c) Fraunhofer IOSB (Author: Florian Düwel)

.. _Custom Dispatchers:

=============================
Custom Dispatcher Integration
=============================
The following sections are supposed as a manual on how to link the Execution Engine with custom dispatchers. The provided code explains the linking of the :ref:`PFDL Scheduler`
and must be adjusted correspondingly.

.. _DispatcherConfig Configuration:

DispatcherConfig Configuration
================================
As stated in :ref:`Dispatcher Configuration`, the :ref:`DispatcherConfig Class` provides an interface for custom dispatcher configurations, where the so called
:ref:`DispatcherInterface Setter functions` configure the Dispatcher Interface and thus, link the dispatcher with the Execution Engine. The following code snippets show,
how each of these setter functions are called to configure the Execution Engine with the :ref:`PFDL Scheduler`.
The objective is to configure the config_dispatcher function and by doing so, add the required functionality to the :ref:`DispatcherConfig Class`:

.. code-block:: python

        from pfdl_scheduler.model.array import Array
        from pfdl_scheduler.scheduler import Scheduler, Event
        from pfdl_scheduler.api.task_api import TaskAPI
        from pfdl_scheduler.api.service_api import ServiceAPI
        from pfdl_scheduler.model.struct import Struct

        from execution_engine_logic.data_types.types import EngineArray, EngineStruct
        from dispatcher.dispatcher_interface import DispatcherInterface

        class DispatcherConfig:

            def __init__(self):
                self.dispatcher_object = DispatcherInterface()
                self.structs = []
                self.config_dispatcher()

            def config_dispatcher(self):
                #create and set the instance that will be configured as dispatcher
                self.dispatcher_object.set_dispatcher(...)
                for key,value in ...:
                    self.structs.append(...)
                self.dispatcher_object.set_process_parameter(self.structs)
                self.dispatcher_object.set_start_dispatcher(...)
                self.dispatcher_object.set_running(...)
                self.dispatcher_object.set_fire_event_method(...)
                self.dispatcher_object.set_interfaces(...)
                self.dispatcher_object.set_register_dispatcher_callbacks(...)




Create Custom Dispatcher Object
---------------------------------

First, a dispatcher has to be created, in our case the :ref:`PFDL Scheduler`.
To create the corresponding Scheduler Object, three new class variables are added to the DispatcherConfig: self.scheduler
will be the PFDL-Scheduler object. The self.filepath is a path to a pfdl file that should be executed with the scheduler and the
self.dashboard_host_address specifies an URL to connect the :ref:`PFDL Scheduler` with the
`SWAP-IT Dashboard <https://github.com/iml130/swap-it-dashboard>`_. The scheduler is then created with:

.. code-block:: python

    class DispatcherConfig:

            def __init__(self, filepath, dashboard_host_address = None):
                self.dispatcher_object = DispatcherInterface()
                self.filepath = filepath
                self.dashboard_host_address = dashboard_host_address
                self.scheduler = self.scheduler = Scheduler(self.filepath,dashboard_host_address=self.dashboard_host_address) if self.dashboard_host_address else Scheduler(self.filepath)
                self.structs = []
                self.config_dispatcher()

The created self.scheduler variable is the added to the config_dispatcher function:

.. code-block:: python

    .. code-block:: python

    class DispatcherConfig:

            def __init__(self, filepath, dashboard_host_address = None):
                self.dispatcher_object = DispatcherInterface()
                self.filepath = filepath
                self.dashboard_host_address = dashboard_host_address
                self.scheduler = self.scheduler = Scheduler(self.filepath,dashboard_host_address=self.dashboard_host_address) if self.dashboard_host_address else Scheduler(self.filepath)
                self.structs = []
                self.config_dispatcher()

            def config_dispatcher(self):
                self.dispatcher_object.set_dispatcher(self.scheduler)
                ...

Hand over custom type definitions
---------------------------------
Next, the custom data types that are required for the process execution and thus, the creation of the :ref:`Data Object`. However, since the
PFDL-Scheduler has a custom type format, which is different to the Execution Engine's data format,
each of the custom types must be mapped to the Execution Engine's format. Here, two functions are defined to map the corresponding data types.
These functions are presented in section :ref:`Type Mapping`. Consequently, each custom type of the PFDL description
is first mapped to the Execution Engine format, and then added to the self.structs array:

.. code-block:: python

    class DispatcherConfig:

            def __init__(self, filepath, dashboard_host_address = None):
                self.dispatcher_object = DispatcherInterface()
                self.filepath = filepath
                self.dashboard_host_address = dashboard_host_address
                self.scheduler = self.scheduler = Scheduler(self.filepath,dashboard_host_address=self.dashboard_host_address) if self.dashboard_host_address else Scheduler(self.filepath)
                self.structs = []
                self.config_dispatcher()

            def config_dispatcher(self):
                self.dispatcher_object.set_dispatcher(self.scheduler)
                for key,value in self.scheduler.process.structs.items():
                    self.structs.append(PfdlEeDataconverter().create_ee_format(value))
                self.dispatcher_object.set_process_parameter(self.structs)




Start and Running
------------------
Next, the Execution Engine requires methods to start a process execution with the scheduler and besides, get the running variable
from the scheduler that indicates whether the process execution is completed or not. While the previously configured setter functions
only receives variables and objects as arguments, the Start and Running configuration hand over functions from the scheduler to the
Execution Engine. Here, the function return_running is added to the DispatcherConfig Class:

.. code-block:: python

    class DispatcherConfig:

            def __init__(self, filepath, dashboard_host_address = None):
                self.dispatcher_object = DispatcherInterface()
                self.filepath = filepath
                self.dashboard_host_address = dashboard_host_address
                self.scheduler = self.scheduler = Scheduler(self.filepath,dashboard_host_address=self.dashboard_host_address) if self.dashboard_host_address else Scheduler(self.filepath)
                self.structs = []
                self.config_dispatcher()

            def config_dispatcher(self):
                ....

            def return_running(self, dispatcher):
                return dispatcher.running

Next the set_start_dispatcher and the set_running functions are configured:

.. code-block:: python

    class DispatcherConfig:

            def __init__(self, filepath, dashboard_host_address = None):
                self.dispatcher_object = DispatcherInterface()
                self.filepath = filepath
                self.dashboard_host_address = dashboard_host_address
                self.scheduler = self.scheduler = Scheduler(self.filepath,dashboard_host_address=self.dashboard_host_address) if self.dashboard_host_address else Scheduler(self.filepath)
                self.structs = []
                self.config_dispatcher()

            def config_dispatcher(self):
                self.dispatcher_object.set_dispatcher(self.scheduler)
                for key,value in self.scheduler.process.structs.items():
                    self.structs.append(PfdlEeDataconverter().create_ee_format(value))
                self.dispatcher_object.set_process_parameter(self.structs)
                self.dispatcher_object.set_start_dispatcher(self.dispatcher_object.dispatcher.start)
                self.dispatcher_object.set_running(self.return_running)


.. _Callbacks:

Callbacks
=========
The PFDL Scheduler provides callbacks for :ref:`Tasks and Services`, as well as da Data Callback. Consequently,
the PFDL-Scheduler deploys all offered callback functions, however, several steps are required
to make the Execution Engine callbacks available to the PFDL Scheduler. First, each callback requires a wrapper function that
ensures that the received input from the scheduler is transformed to an Execution Engine compatible format and in addition,
that the input arguments for the callback functions are extracted from the scheduler objects:



.. code-block:: python

    def fire_dispatcher_event(self, service_uuid):
        self.scheduler.fire_event(Event(event_type="service_finished",
                                   data={"service_id": service_uuid}))

    def task_started_interface(self, task_api: TaskAPI):
        task_context_uuid = task_api.uuid if task_api.task.name == "productionTask" else task_api.task_context.uuid
        input_parameters = self.map_input_parameters_to_EE(task_api.input_parameters)
        self.dispatcher_object.task_started_callback_wrapper(task_api.task.name, task_api.uuid, task_context_uuid,
                                                        task_api.task.input_parameters, input_parameters)

    def task_finished_interface(self, task_api: TaskAPI):
        task_context_uuid = task_api.uuid if task_api.task.name == "productionTask" else task_api.task_context.uuid
        self.dispatcher_object.task_finished_callback_wrapper(task_api.task.name, task_api.uuid, task_context_uuid,
                                                         task_api.task.output_parameters)

    def service_finished_interface(self, service_api: ServiceAPI):
        self.dispatcher_object.service_finished_callback_wrapper(service_api.service.name,
                                                            service_api.uuid, service_api.task_context.uuid)

    def service_started_interface(self, service_api: ServiceAPI):
        input_parameters = self.map_input_parameters_to_EE(service_api.input_parameters)
        self.dispatcher_object.service_started_callback_wrapper(service_api.service.name, service_api.uuid, input_parameters,
                                                           service_api.service.output_parameters,
                                                           service_api.task_context.uuid)

    def data_provider_interface(self, variable_name, task_id):
        variable_name, struct = self.dispatcher_object.provide_parameter_wrapper(variable_name, task_id.uuid)
        return EePfdlConverter().convert_ee_to_pfdl(variable_name, struct)

    def map_input_parameters_to_EE(self, input_parameter_values):
        input_parameters = []
        if len(input_parameter_values) > 0:
            for i in range(len(input_parameter_values)):
                if isinstance(input_parameter_values[i], Struct):
                    input_parameters.append(PfdlEeDataconverter().create_ee_format(input_parameter_values[i]))
                else:
                    input_parameters.append(input_parameter_values[i])
        return input_parameters

The function map_input_parameters_to_EE() is not a callback wrapper,
it only transforms input data structures from the PFDL-scheduler format to the Execution Engine format.

In the second step, a function is required that invokes the register_callback functions of the PFDL-Scheduler and thus,
registers the callback wrappers within the PFDL-Scheduler:

.. code-block:: python

    def register_dispatcher_callbacks(self):
        self.dispatcher_object.dispatcher.register_callback_service_started(self.service_started_interface)
        self.dispatcher_object.dispatcher.register_callback_service_finished(self.service_finished_interface)
        self.dispatcher_object.dispatcher.register_callback_task_started(self.task_started_interface)
        self.dispatcher_object.dispatcher.register_callback_task_finished(self.task_finished_interface)
        self.dispatcher_object.dispatcher.register_variable_access_function(self.data_provider_interface)

As last step, the previously defined functions must be added to the config_dispatcher function, resulting in a
complete integration of the PFDL-Scheduler as Execution Engine Dispatcher:

.. code-block:: python

    class DispatcherConfig:

        def __init__(self, filepath, dashboard_host_address = None):
            self.dispatcher_object = DispatcherInterface()
            self.filepath = filepath
            self.dashboard_host_address = dashboard_host_address
            self.scheduler = Scheduler(self.filepath,dashboard_host_address=self.dashboard_host_address) if self.dashboard_host_address else Scheduler(self.filepath)
            self.structs = []
            self.config_dispatcher()

        def config_dispatcher(self):
            self.dispatcher_object.set_dispatcher(self.scheduler)
            for key,value in self.scheduler.process.structs.items():
                self.structs.append(PfdlEeDataconverter().create_ee_format(value))
            self.dispatcher_object.set_process_parameter(self.structs)
            self.dispatcher_object.set_start_dispatcher(self.dispatcher_object.dispatcher.start)
            self.dispatcher_object.set_running(self.return_running)
            self.dispatcher_object.set_register_dispatcher_callbacks(self.register_dispatcher_callbacks)
            self.dispatcher_object.set_fire_event_method(self.fire_dispatcher_event)

.. _Type Mapping:

Type Mapping
============

As stated before, the PFDL-Scheduler and the Execution Engine define different internal data structures,
so that corresponding mapping functions have to be defined. Here, one maps Execution Engine types to PFDL-Scheduler
types and the second maps the opposite direction.

PFDL-Scheduler <-> Execution Engine
------------------------------------

.. code-block:: python

    class PfdlEeDataconverter:

        def create_ee_format(self, initial_type):
            item = EngineStruct(initial_type.name)
            item = self.convert_to_EE_struct(initial_type, item)
            item.set_struct_type(initial_type.name)
            return item

        def convert_array(self, array, item):
            for i in range(len(array.values)):
                if isinstance(array.values[i], Struct):
                    val = EngineStruct(array.values[i].name)
                    val = self.convert_to_EE_struct(array.values[i], val)
                    val.set_struct_type(array.values[i].name)
                    item.add_value(val)
                elif isinstance(array.values[i], Array):
                    val = EngineArray(array.values[i].name, array.values[i].length)
                    val.set_array_type(array.values[0].data_type)
                    val = self.convert_array(array.values[i], val)
                    item.add_value(val)
                else:
                    item.add_value(array.values[i].values)
            return item

        def convert_to_EE_struct(self, struct, item):
            for(key, value) in struct.attributes.items():
                if isinstance(value, Struct):
                    val = EngineStruct(key)
                    val.set_struct_type(value.name)
                    val = self.convert_to_EE_struct(value, val)
                    item.add_attribute(key, val)
                elif isinstance(value, Array):
                    val = EngineArray(key, value.length)
                    val.set_array_type(value.type_of_elements)
                    if val.data_type == '':
                        val.set_array_type(value.values[0].name)
                    val = self.convert_array(value, val)
                    item.add_attribute(key, val)
                else:
                    item.add_attribute(key, value)
            return item

Execution Engine <-> PFDL-Scheduler
------------------------------------

.. code-block:: python

    class EePfdlConverter:

        def convert_ee_to_pfdl(self, variable_name, server_struct):
            variable = Struct()
            variable.name = variable_name
            if isinstance(server_struct, EngineStruct):
                for (name, value) in server_struct.attributes.items():
                    if isinstance(value, str) or isinstance(value, bool) or isinstance(value, int) or isinstance(value, float):
                        variable.attributes[name] = value
                    elif isinstance(value, EngineArray):
                        vals = Array()
                        vals.length = value.length
                        vals.name = name
                        for i in range(value.length):
                            vals.values.append(self.convert_ee_to_pfdl(name, value.values[i]))
                            variable.attributes[name] = vals
                    else:
                        variable.attributes[name] = self.convert_ee_to_pfdl(name, value)
            return variable