orangeqs.juice.messaging.protocol#

Protocol interfaces for messaging between Juice services.

This module defines generic interfaces that should be implemented using a base protocol, e.g ZMQ. All interfaces in this module are agnostic to both the underlying protocol and Juice itself. This means that this module contains no references to Juice and/or the underlying protocol.

Both the subscriber and publisher are expected to connect to a central proxy such that one can publish and subscribe to messages from a single source.

As of now this module only contains interfaces for pub/sub messaging. In the future it will also include standard interfaces for request/reply.

Module Contents#

Classes#

Event

Message representing an event that is published to Juice services.

SubscriberAsync

Asynchronous client for subscribing to events from other Juice services.

SubscriberBlocking

Synchronous blocking client for subscribing to events from other Juice services.

PublisherAsync

Asynchronous client for publishing events to other Juice services.

PublisherBlocking

Synchronous blocking client for publishing events to other Juice services.

Subscriber

Asynchronous client for subscribing to events from other Juice services.

Publisher

Asynchronous client for publishing events to other Juice services.

Data#

EventType

Type variable for Event.

API#

class orangeqs.juice.messaging.protocol.Event(/, **data: Any)#

Bases: orangeqs.juice.messaging.message.Message, orangeqs.juice.database.Point

Message representing an event that is published to Juice services.

Other events sent over pub/sub must inherit this class. See Message for more information. This class also inherits from Point which allows these events to be stored in the database.

Examples#

For example, one can add an event that represents a kernel status changing.

class KernelStatusChanged(Event):
    def topic(self) -> str:
        return self.service

    service: str
    status: str

This event includes two fields (service and status). Additionally, it marks that the topic of the event should be the service name. Now, subscribers can use a topic filter to only subscribe to kernel status updates of a specific service. If the subscriber does not add a topic filter it will receive all kernel status updates by default.

Also see topic().

topic() str | None#

Topic of the message.

Can be used by listeners to filter events of the same type. Can be implemented by subclasses to provide fine control on subscriptions. Defaults to no topic.

See the docstring of Event for more information on how to use topics.

Returns#

  • (str): The topic of this event.

orangeqs.juice.messaging.protocol.EventType#

‘TypeVar(…)’

Type variable for Event.

class orangeqs.juice.messaging.protocol.SubscriberAsync(uri: str, *, queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType] | None = None)#

Bases: typing.Protocol[orangeqs.juice.messaging.protocol.EventType]

Asynchronous client for subscribing to events from other Juice services.

The subscriber will not receive any events until you subscribe to events. See SubscriberAsync.subscribe() for subscribing to events.

See PublisherAsync or PublisherBlocking for publishing events.

Parameters#

  • uri (str): URI to connect the subscriber to. Format is implementation dependent.

  • queue (asyncio.Queue, optional): Optional queue new messages will be put in. Will instantiate a new queue if not provided.

Examples#

Instantiate a subscriber and print new events as they are received.

subscriber = SubscriberAsync("<uri>")
subscriber.subscribe(DummyEvent, topic="something"))

while True:
    event = await subscriber.get()
    assert type(event) == DummyEvent
    assert event.topic().startswith("something")
    print(f"Received {type(event)} with data {event}")
abstract property queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType]#

Queue to which new messages will be put that are received by listen().

Deprecated, use the get() method to retrieve messages instead.

subscribe(event_type: type[orangeqs.juice.messaging.protocol.EventType], *, topic: str | None = None) None#

Subscribe to an event.

It is possible to subscribe to multiple event types and/or topics. The subscriber allows filtering by event type and an optional topic. The topic filter checks if the topic of the event starts with the topic filter. This means that a topic filter with value "a.b" will match events with topic "a.b" and "a.b.c", but not "a.c" or "a.bb". Note that each topic filter always targets a specific event type, thus they are not shared between different event types.

Parameters#

  • event_type (type[EventType]): Type of event to subscribe to. Will subscribe only to one specific event type, so not types that inherit this type.

  • topic (str, optional): Optional topic to filter events. See above for explanation. If not provided will subscribe to all events of specified type.

stop() None#

Stop listening for new events.

Stops the listen() coroutine.

Deprecated, use the get() method to retrieve messages instead.

async listen() None#

Coroutine to listen for new events and append them to the queue.

Runs continuously until .stop() is called. This coroutine should be continuously running to receive new messages and append them to the queue.

Deprecated, use the get() method to retrieve messages instead.

async get(timeout: float | None = None) orangeqs.juice.messaging.protocol.EventType#

Get the next event from the subscriber.

Waits asynchronously until a new event is received.

Parameters#

  • timeout (float, optional): Optional timeout in seconds to wait for a new event. If not provided, will wait indefinitely.

Raises#

  • (TimeoutError): If a timeout is provided and no event is received within the timeout period.

class orangeqs.juice.messaging.protocol.SubscriberBlocking(uri: str, /)#

Bases: typing.Protocol[orangeqs.juice.messaging.protocol.EventType]

Synchronous blocking client for subscribing to events from other Juice services.

The subscriber will not receive any events until you subscribe to events. See subscribe() for subscribing to events.

See PublisherBlocking or PublisherAsync for publishing events.

Parameters#

  • uri (str): URI to connect the subscriber to. Format is implementation dependent.

Examples#

Instantiate a subscriber and wait for new events.

subscriber = SubscriberBlocking("<uri>")
subscriber.subscribe(DummyEvent, topic="something"))

subscriber.get()  # Blocking call to get the next event
subscribe(event_type: type[orangeqs.juice.messaging.protocol.EventType], /, *, topic: str | None = None) None#

Subscribe to an event.

It is possible to subscribe to multiple event types and/or topics. The subscriber allows filtering by event type and an optional topic. The topic filter checks if the topic of the event starts with the topic filter. This means that a topic filter with value "a.b" will match events with topic "a.b" and "a.b.c", but not "a.c" or "a.bb". Note that each topic filter always targets a specific event type, thus they are not shared between different event types.

Parameters#

  • event_type (type[EventType]): Type of event to subscribe to. Will subscribe only to one specific event type, so not types that inherit this type.

  • topic (str, optional): Optional topic to filter events. See above for explanation. If not provided will subscribe to all events of specified type.

get(timeout: float | None = None) orangeqs.juice.messaging.protocol.EventType#

Get the next event from the subscriber.

This is a blocking call that waits until a new event is received.

Parameters#

  • timeout (float, optional): Optional timeout in seconds to wait for a new event. If not provided, will wait indefinitely.

Raises#

  • (TimeoutError): If a timeout is provided and no event is received within the timeout period.

class orangeqs.juice.messaging.protocol.PublisherAsync(uri: str)#

Bases: typing.Protocol

Asynchronous client for publishing events to other Juice services.

See SubscriberAsync or SubscriberBlocking for subscribing to events.

Parameters#

  • uri (str): URI to connect the publisher to. Format is implementation dependent.

Examples#

Instantiate a publisher and publish an event.

publisher = PublisherAsync("<uri>")
await publisher.publish(DummyEvent(data="data"))
async publish(event: orangeqs.juice.messaging.protocol.Event) None#

Publish a message to the system.

Parameters#

  • event (Event): Event to be published.

class orangeqs.juice.messaging.protocol.PublisherBlocking(uri: str)#

Bases: typing.Protocol

Synchronous blocking client for publishing events to other Juice services.

See SubscriberBlocking or SubscriberAsync for subscribing to events.

Parameters#

  • uri (str): URI to connect the publisher to. Format is implementation dependent.

Examples#

Instantiate a publisher and publish an event.

publisher = PublisherBlocking("<uri>")
publisher.publish(DummyEvent(data="data"))
publish(event: orangeqs.juice.messaging.protocol.Event) None#

Publish a message to the system.

Parameters#

  • event (Event): Event to be published.

class orangeqs.juice.messaging.protocol.Subscriber(uri: str, *, queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType] | None = None)#

Bases: orangeqs.juice.messaging.protocol.SubscriberAsync[orangeqs.juice.messaging.protocol.EventType], typing.Protocol

Asynchronous client for subscribing to events from other Juice services.

The subscriber will not receive any events until you subscribe to events. See SubscriberAsync.subscribe() for subscribing to events.

See PublisherAsync or PublisherBlocking for publishing events.

Parameters#

  • uri (str): URI to connect the subscriber to. Format is implementation dependent.

  • queue (asyncio.Queue, optional): Optional queue new messages will be put in. Will instantiate a new queue if not provided.

Examples#

Instantiate a subscriber and print new events as they are received.

subscriber = SubscriberAsync("<uri>")
subscriber.subscribe(DummyEvent, topic="something"))

while True:
    event = await subscriber.get()
    assert type(event) == DummyEvent
    assert event.topic().startswith("something")
    print(f"Received {type(event)} with data {event}")
class orangeqs.juice.messaging.protocol.Publisher(uri: str)#

Bases: orangeqs.juice.messaging.protocol.PublisherAsync, typing.Protocol

Asynchronous client for publishing events to other Juice services.

See SubscriberAsync or SubscriberBlocking for subscribing to events.

Parameters#

  • uri (str): URI to connect the publisher to. Format is implementation dependent.

Examples#

Instantiate a publisher and publish an event.

publisher = PublisherAsync("<uri>")
await publisher.publish(DummyEvent(data="data"))