Database#

OrangeQS Juice includes a built-in database system that allows services to store and query time-series data. The database is an InfluxDB2 instance that is pre-configured and runs as part of the OrangeQS Juice installation.

This guide will go step-by-step to start using the database to manage your own data.

Understanding InfluxDB concepts#

Before diving into using the database, it is important to understand some basic InfluxDB concepts. To start, read the data organization section. This section summarizes the key concepts of InfluxDB:

  • Buckets

  • Measurements

  • Tags

  • Fields

  • Series

Secondly, find more detailed explanations and examples of these concepts in the InfluxDB data elements section. In the rest of this guide, whenever these words are used, they refer to the InfluxDB concepts explained in the linked section. After you have familiarized yourself with these concepts, you are ready to start using the database in OrangeQS Juice.

Using and creating buckets#

A bucket is a named location where multiple measurements are stored. Within OrangeQS Juice, there are two conventions for using buckets:

  • Measurements with schemas that are unique to a service should be stored in a bucket named after the service. For example, a service named device should store its unique measurements in a bucket named service-device. This ensures that different services can operate independently without interfering with each other’s schemas and data.

  • Measurements with schemas that are shared between multiple services can either be stored in a shared bucket named after the purpose of the measurements or in a service-specific bucket. For example, measurements related to temperature readings could be stored in a shared bucket named temperature. Alternatively, they could be stored in that service’s bucket (e.g., service-device). The latter makes the system more modular, but note that data cannot be queried easily across different buckets.

To add a bucket, see Adding InfluxDB buckets and the configuration section orchestration.toml::influxdb2.buckets.

Defining a schema#

To interact with the database, you need to define a schema that describes the structure of the data points you want to store. A schema is defined by creating a subclass of the Point class. The schema class is based on a pydantic.BaseModel and uses Python type annotations to define the measurement name, tags, and fields of the data points.

Point

Base class for time-series data points used to store and query data from InfluxDB.

As an example, we will create a schema for storing temperature readings from a device.

from typing import ClassVar, Annotated
from orangeqs.juice.database import Point

class TemperatureReading(Point):
    # The measurement name is defined as a class variable
    measurement: ClassVar[str] = "temperature_reading"

    # Tags are annotated with "tag"
    device_id: Annotated[str, "tag"]

    # By default, all other attributes are fields
    temperature: float
    unit: str = "K"

See the documentation of the Point class for best practices and more details on how to define your schema. Especially the difference between tags and fields is important to understand for efficient data storage and querying.

Writing data#

Writing data to the database is done using the following APIs:

Point.write

Write the Point instance to InfluxDB.

Point.write_async

Write the Point instance to InfluxDB.

influxdb2_write_api

Get an InfluxDB2 write API client for the database.

influxdb2_write_api_async

Get an InfluxDB2 write API async client for the database.

For example, to write a temperature reading using the previously defined TemperatureReading schema, you can use the blocking API as follows:

from orangeqs.juice.client.influxdb2 import influxdb2_write_api

# Create a reusable instance of the write API
write_api = influxdb2_write_api()

# Create an instance of the schema and write it to the database
reading = TemperatureReading(device_id="device_123", temperature=300.0)
reading.write(write_api, bucket="service-device")

Alternatively, using the asynchronous API:

from orangeqs.juice.client.influxdb2 import influxdb2_write_api_async

write_api_async = influxdb2_write_api_async()

reading = TemperatureReading(device_id="device_123", temperature=300.0)
await reading.write_async(write_api_async, bucket="service-device")

When to use the blocking or the synchronous API

By default, the blocking API uses a background thread to perform write operations in batching mode. Thus, as write operations are performed in the background, the main program continues executing without waiting for the write to complete. This means that a reference to the write API needs to be kept to ensure that all data is written to the database in the background.

Conversely, the asynchronous API is fully asynchronous and does not use background threads. Thus, when using the asynchronous API, awaiting the write method will wait for each write operation to complete before continuing execution. Note that by the nature of asyncio other tasks will not block while waiting for the write operation to complete.

If absolutely necessary, it is also possible to use the blocking API in a synchronous mode, meaning the function call will block until the write operation is complete:

from influxdb_client.client.write_api import SYNCHRONOUS
synchronous_write_api = influxdb2_write_api(write_options=SYNCHRONOUS)

To conclude, if atomic writes are not required, it is usually more efficient to use the blocking API in batching mode. However, if atomic writes are required, or if the program is fully asynchronous, the asynchronous API is a good fit. As a last resort, the blocking API in synchronous mode can be used.

Querying data#

Querying data from the database is done using the following APIs:

Point.query

Query the database for all points of this type based on filters.

Point.query_async

Query the database for all points of this type based on filters.

influxdb2_query_api

Get an InfluxDB2 query API client for the database.

influxdb2_query_api_async

Get an InfluxDB2 query API async client for the database.

As an example, we will query temperature readings using the previously defined TemperatureReading schema. The example below queries the 5 most recent temperature readings from a specific device in the last hour using the blocking API:

from orangeqs.juice.client.influxdb2 import influxdb2_query_api

# Create a reusable instance of the query API
query_api = influxdb2_query_api()

# Query the database for temperature readings from a specific device
readings = TemperatureReading.query(
    query_api,
    bucket="service-device",
    start="-1h",
    filters={"device_id": "my-device"},
    limit=5,
    sort="desc",
)

The readings variable will contain a list of TemperatureReading instances matching the query criteria. The query API also supports outputting the data as a pandas.DataFrame by specifying output="dataframe" in the query method. See the docstring of Point.query() or Point.query_async() for more details on the available query parameters.

Filtering by tags vs. fields

When filtering data during queries, it is important to understand the difference between tags and fields. In InfluxDB, tags are indexed but fields are not. This means that filtering by tags is much more performant than filtering by fields. Therefore, it is recommended to use tags for attributes that are frequently used as filters in queries.

See InfluxDB schema design best practices for more information.