orangeqs.juice.client.pubsub#

Helpers to instantiate publisher and subscribers from Juice configuration.

Module Contents#

Functions#

publish_event

Publish an event to pubsub and database.

publisher_async

Create an async publisher instance from Juice configuration.

publisher_blocking

Create a synchronous publisher instance from Juice configuration.

subscriber_async

Create an async subscriber instance from Juice configuration.

subscriber_blocking

Create a synchronous subscriber instance from Juice configuration.

API#

async orangeqs.juice.client.pubsub.publish_event(event: orangeqs.juice.messaging.protocol.Event, publisher: orangeqs.juice.messaging.protocol.PublisherAsync, write_api: influxdb_client.WriteApi | influxdb_client.client.write_api_async.WriteApiAsync, bucket: str) None#

Publish an event to pubsub and database.

This is a convenience function to publish an event and write it to the database. In the future, the database functionality will be deprecated and handled internally by the Juice Task Manager. See #60 for more information.

Parameters#

  • event (Event): The event to publish.

  • publisher (PublisherAsync): The publisher to use for publishing the event.

  • write_api (WriteApi | WriteApiAsync): The InfluxDB write API to use for writing the event.

  • bucket (str): The InfluxDB bucket to write the event to.

orangeqs.juice.client.pubsub.publisher_async(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None) orangeqs.juice.messaging.protocol.PublisherAsync#

Create an async publisher instance from Juice configuration.

Parameters#

  • connection_info (TaskManagerConnectionInfo, optional): Connection info to use for determining publisher uri. If not provided, will load from disk.

Returns#

  • (PublisherAsync): Publisher to publish events.

Examples#

Instantiate a publisher and publish an event:

from orangeqs.juice.messaging import Event
from orangeqs.juice.client.pubsub import publisher_async

class HelloEvent(Event):
    message: str

publisher = publisher_async()
await publisher.publish(HelloEvent(message="Hello, World!"))
orangeqs.juice.client.pubsub.publisher_blocking(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None) orangeqs.juice.messaging.protocol.PublisherBlocking#

Create a synchronous publisher instance from Juice configuration.

Parameters#

  • config (TaskManagerConfig, optional): Task manager configuration to use for determining publisher uri. If not provided, will load from disk.

  • connection_info (TaskManagerConnectionInfo, optional): Connection info to use for determining publisher uri. If not provided, will load from disk.

Returns#

  • (PublisherBlocking): Publisher to publish events.

Examples#

Instantiate a publisher and publish an event:

from orangeqs.juice.messaging import Event
from orangeqs.juice.client.pubsub import publisher_blocking

class HelloEvent(Event):
    message: str

publisher = publisher_blocking()
publisher.publish(HelloEvent(message="Hello, World!"))
orangeqs.juice.client.pubsub.subscriber_async(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None, *, queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType] | None = None) orangeqs.juice.messaging.protocol.SubscriberAsync[orangeqs.juice.messaging.protocol.EventType]#

Create an async subscriber instance from Juice configuration.

Parameters#

  • config (JuiceSettings): Juice configuration to use for determining subscriber uri.

  • queue (asyncio.Queue, optional): Optional queue new messages will be put in. Will instantiate a new queue if not provided.

Returns#

  • (SubscriberAsync): Subscriber to subscribe to events.

Examples#

Instantiate a subscriber and listen for events:

import asyncio
from orangeqs.juice.messaging import Event
from orangeqs.juice.client.pubsub import subscriber_async

class HelloEvent(Event):
    message: str

subscriber = subscriber_async()
subscriber.subscribe(HelloEvent)

while True:
    event = await subscriber.get()
    print(f"Received: {event.model_dump()}")
orangeqs.juice.client.pubsub.subscriber_blocking(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None, /) orangeqs.juice.messaging.protocol.SubscriberBlocking[orangeqs.juice.messaging.protocol.Event]#

Create a synchronous subscriber instance from Juice configuration.

Parameters#

  • config (JuiceSettings): Juice configuration to use for determining subscriber uri.

Returns#

  • (SubscriberBlocking): Subscriber to subscribe to events.

Examples#

Instantiate a subscriber and get the next event:

import asyncio
from orangeqs.juice.messaging import Event
from orangeqs.juice.client.pubsub import subscriber_blocking

class HelloEvent(Event):
    message: str

subscriber = subscriber_blocking()
subscriber.subscribe(HelloEvent)

event = subscriber.get()