Tasks#

This guide shows how to define tasks for your own services and execute these tasks from another context. Tasks are pieces of work that are executed on a remote OrangeQS Juice service, which makes OrangeQS Juice tasks a form of RPCs. By defining your own task types and handlers you can let others execute actions on your service.

This guide is split into the following sections:

  • Task flow explains how tasks are routed through the task manager.

  • Defining tasks shows how to define a task schema and register a handler on a service.

  • Executing tasks finally shows how to execute the task you just defined.

Not fully implemented yet!

This section is part of documentation-driven development. Nonetheless, most features documented here are functional, with the exception of:

  • The queues. Tasks are not queued.

  • The payload.parallel option. Tasks are always executed in parallel.

Task flow#

Tasks are routed through the task manager, the OrangeQS Juice service responsible for collecting and scheduling tasks. The task manager is responsible for:

  • Collecting tasks from clients.

  • Routing tasks to the correct service.

  • Queueing the task if it is synchronous (see below)

  • Communicating the result back to the client.

Tasks are executed using a request and reply model. Clients submit tasks to the task manager. The task manager schedules the submitted task to be executed on the corresponding OrangeQS Juice service. The OrangeQS Juice service executes the task by calling the registered handler. A task handler is a function that is registered to execute all tasks of a type. The OrangeQS Juice service sends the reply back to the task manager. The task manager sends the reply back to the client.

The full request and reply flow is shown the diagram below.

Request and reply task flow.

Request and reply task flow.#

  1. A client sends an execute request with a target service and payload to the task manager.

  2. The task manager creates a unique task identifier and sends this back to the client.

  3. The task manager checks whether the task should be executed in parallel or synchronously. A task is parallel if it has a truthy payload.parallel.

    • If the task should be executed in parallel, send it directly to the target service.

    • If the task should be executed synchronously append it to the queue of that service. If the previous synchronous task is done or no synchronous task is being executed, pop the top-most task from the queue and send it to the target service.

  4. The target service receives the payload and calls the configured handler. A task handler is a function that is registered to execute all tasks of a type.

  5. The target service returns the result of the handler as reply to the task manager.

  6. The task manager returns the reply to the client.

Defining tasks#

A task definition consists of the task schema and the task handler. The task schema contains the task payload (arguments) and whether the task is parallel or synchronous. The task handler is the function that executes the task on the target service. The task payload will be passed to the handler on the target service.

We start by defining the task schema.

from typing import ClassVar
from orangeqs.juice.task import Task

# The task schema
class Greeting(Task):
    # The task payload
    greeting: str = "Hello"
    name: str

    # Whether the task is allowed be executed in parallel
    parallel: ClassVar[bool] = True

Secondly, we configure the handler on the service instance.

from orangeqs.juice.service import Service

# Option 1: Define the handler as a function
def _handle_greeting(payload: Greeting)
    return f"{payload.greeting} {payload.name}"

class MyService(Service):

    # Option 2: Define the handler as a method
    def _handle_greeting(self, payload: Greeting):
        return f"{payload.greeting} {payload.name} from {self.service_name}"

    def __init__(service_name: str) -> None:
        super().__init__(service_name)

        self.register_handler(Greeting, _handle_greeting)  # Option 1
        self.register_handler(Greeting, self._handle_greeting)  # Option 2

IPython tasks

If the service is an IPythonService, a handler for any subclass of IPythonTask is already registered. This handler executes the code field of the task payload on the IPython kernel. Thus, for defining a task that executes code in the IPython kernel you only have to configured the task schema. For example:

from pydantic import computed_field

from orangeqs.juice.schemas.tasks import IPythonTask

class MaintainNode(IPythonTask):
    node_id: str

    @computed_field
    @property
    def code(self) -> str:
        return f"calibration_graph.nodes['{node_id}'].maintain()"

Note that the order of the decorators matters here!

Executing tasks#

Executing task is done using the Client.request or Client.execute method. See the respective docstrings for more detailed information. Below is a minimal example.

from orangeqs.juice import Client

client = Client()
payload = Greeting(name="Extension Developer")

# Option 1. Schedule the task and do not wait for the result.
request = await client.request("my-service", payload)
task_id = request.task_id
result = await request

# Option 2. Schedule the task, wait for it finish and return the result.
result = await client.execute("my-service", payload)