orangeqs.juice.service._task_server#

Task handler for services.

Module Contents#

Classes#

TaskHandlerRegistry

Registry for task handlers.

TaskServer

A server for handling incoming tasks over WebSockets.

WebsocketHandler

Websocket handler for deserializing incoming tasks.

API#

class orangeqs.juice.service._task_server.TaskHandlerRegistry#

Registry for task handlers.

Provides methods to register and handle tasks by type.

register_handler(task_type: type[orangeqs.juice.service._task_server._TaskType], func: collections.abc.Callable[[orangeqs.juice.service._task_server._TaskType], Any]) None#

Register a handler for a specific task type.

Parameters#

  • task_type (type[Task]): The type of the task to register the handler for.

  • func (Callable[[Task], Any]): The handler function to register. It should take a single argument of the task type and return any value.

handle_task(task: orangeqs.juice.task.Task) Any#

Handle a task by calling the registered handler.

Parameters#

  • task (Task): The task to handle.

Returns#

  • (Any): The result of the handler function.

Raises#

  • (ValueError): If no handler is registered for the task type.

class orangeqs.juice.service._task_server.TaskServer(config: orangeqs.juice.schemas.tasks.TaskServerConfig)#

Bases: orangeqs.juice.service._task_server.TaskHandlerRegistry

A server for handling incoming tasks over WebSockets.

start() None#

Start the task server in a separate thread.

property loop: asyncio.AbstractEventLoop#

The event loop used by the service.

If a subclass uses a different event loop, it should override this property.

class orangeqs.juice.service._task_server.WebsocketHandler(application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)#

Bases: tornado.websocket.WebSocketHandler

Websocket handler for deserializing incoming tasks.

Handles incoming messages by deserializing them and calling the provided handle_message function. The result is then serialized and sent back to the client.

initialize(handle_message: collections.abc.Callable[[dict[str, Any], str | bytes], collections.abc.Coroutine[None, None, Any]], raw: bool = False) None#

Initialize the handler with the message handling function.

Parameters#

  • handle_message (Callable[[dict[str, Any]], Coroutine[None, None, Any]]): The function to handle incoming messages. It should take a dictionary (the deserialized message) and return a coroutine that resolves to the result to be sent back to the client.

  • raw (bool): If True, expects the handle_message function to return the raw dictionary response to be send back to the client.

on_message(message: str | bytes) None#

Handle incoming message on websocket.

on_close() None#

Cancel all pending message handlers tasks.