Pub/Sub Communication#

For dashboards, services and notebooks to work seamlessly together, OrangeQS Juice provides a framework for publishing messages and logging them to the database.

This tutorial shows how to publish and subscribe to events and write/query them from the database using the orangeqs.juice.messaging and orangeqs.juice.database framework. See protocol and database for the full specification. The tutorial will link to the full specification where relevant.

This tutorial assumes you have set up a Juice instance with a task manager service and a service where you can write code. For this tutorial, we suggest opening a Proxy Kernel in an OrangeQS service to act as a publisher. We will create two clients, one for publishing events from a service and one for subscribing/querying these events from a personal notebook.

Step 1: Start the publisher#

Start by opening a new Kernel in your service. Next, you can start publishing events from this notebook using the following code snippet:

import asyncio
import random

from typing import Self, ClassVar

from orangeqs.juice import Client
from orangeqs.juice.messaging import Event

class QuantityUpdate(Event):
    """Dummy event representing a value of a physical quantity."""

    measurement: ClassVar[str] = "dummy_quantity"

    def topic(self) -> str:
        """Topic of the quantity event, equal to `quantity`."""
        return self.quantity

    quantity: Annotated[str, "tag"]
    value: int | float
    unit: str | None

    @classmethod
    def generate_random(cls) -> Self:
        """Generate a random `QuantityUpdate` event"""
        quantity, unit = random.choice([("voltage", "V"), ("current", "A"), ("power", "W")])
        value = round(random.uniform(0.0, 100.0), 2)
        return cls(quantity=quantity, value=value, unit=unit)

client = Client()
publisher = client.publisher_async()
write_api = client.influxdb2_write_api()
bucket = "dummy_bucket" #Make sure to set this up

while True:
    event = QuantityUpdate.generate_random()
    print(f"Publishing: {event.quantity} = {event.value} {event.unit}")

    await client.publish_event(
        event=event,
        publisher=publisher,
        write_api=write_api,
        bucket=bucket
        )
    await asyncio.sleep(1)

This snippet instantiates a PublisherAsync which starts publishing events. For the event type we will use QuantityUpdate as an example. Every second it should print the event with random data that was published. The event will be published for other services to receive. It will also be logged in the database. As there are no subscribers yet, the events will be silently discarded.

The first notebook is now done and we should leave it running in the background.

Step 2: Prepare the subscriber#

Next, open a second notebook within the same Juice instance. Let’s create a subscriber inside that notebook that listens for QuantityUpdate events with the topic "voltage" or "current". In QuantityUpdate.topic() we can see that the topic of the event is set to the name of the quantity.

import asyncio

from orangeqs.juice import Client
from orangeqs.juice.messaging import Event

# Usually you would import this from a common location. For demonstration purposes
# we copy the class definition here.
class QuantityUpdate(Event):
    measurement: ClassVar[str] = "dummy_quantity"

    def topic(self) -> str:
        """Topic of the quantity event, equal to `quantity`."""
        return self.quantity

    quantity: Annotated[str, "tag"]
    value: int | float
    unit: str | None

client = Client()
subscriber = client.subscriber_async()
subscriber.subscribe(QuantityUpdate, topic="voltage")
subscriber.subscribe(QuantityUpdate, topic="current")

This will instantiate a SubscriberAsync that connects to the central proxy to receive events it is subscribed to. See SubscriberAsync.subscribe() to learn more about subscribing to specific event types and topics.

Step 3: Start listening for events#

So far we have instantiated the subscriber, so let’s now access incoming events on the subscriber. The coroutine SubscriberAsync.get() can be used to retrieve messages from the subscriber.

while True:
    event = await subscriber.get()
    print(f"Received: {event.quantity} = {event.value} {event.unit}")

The messages that are published from the first notebook that match the subscriptions should be printed.