orangeqs.juice.database._point#

Base class for time-series data points.

Module Contents#

Classes#

Point

Base class for time-series data points used to store and query data from InfluxDB.

API#

class orangeqs.juice.database._point.Point(/, **data: Any)#

Bases: pydantic.BaseModel

Base class for time-series data points used to store and query data from InfluxDB.

Subclassing this class defines a schema for a specific measurement. This schema class can be used to store and query data from InfluxDB. The schema is based on a pydantic.BaseModel, so all pydantic features are available.

This base class follows two additional conventions:

  • By default, all fields are stored as InfluxDB fields. To mark a field as a tag, annotate the type using the string "tag", e.g. field: typing.Annotated[str, "tag"].

  • Every subclass must define a measurement class variable, which specifies the InfluxDB measurement name, e.g. measurement: typing.ClassVar[str] = "some_name".

Also see schema design best practices in the InfluxDB documentation, especially regarding the use of tags vs. fields.

Examples#

This example shows:

  • How to define a schema for a specific measurement.

  • How to store a record.

  • How to query it from the database.

from typing import Annotated, ClassVar
from datetime import datetime
from orangeqs.juice.database import Point

# Define a schema
class SomeEvent(Point):
    measurement: ClassVar[str] = "some_event"

    event_id: Annotated[str, "tag"]
    value: float

# Writing the event to InfluxDB
event = SomeEvent(event_id="event123", value=42.0)
event.write(write_api, bucket="my_bucket")

# Querying events from InfluxDB
events = SomeEvent.query(
    query_api,
    bucket="my_bucket",
    filters={"event_id": "event123"},
    limit=10
)
measurement: ClassVar[str]#

None

Name of the InfluxDB measurement. Must be unique and defined in every subclass.

time: datetime.datetime#

‘Field(…)’

The timestamp of the data point. Defaults to the current time in UTC.

to_influxdb2() influxdb_client.Point#

Convert the Point instance to an InfluxDB Point.

Returns#

  • (influxdb_client.Point): The corresponding InfluxDB Point object.

write(write_api: influxdb_client.WriteApi, bucket: str, **kwargs: Any) None#

Write the Point instance to InfluxDB.

Parameters#

  • write_api (influxdb_client.WriteApi): The InfluxDB WriteApi instance.

  • bucket (str): The target bucket in InfluxDB.

  • **kwargs (Any): Additional keyword arguments to pass to the WriteApi.write() method.

async write_async(write_api: influxdb_client.client.write_api_async.WriteApiAsync, bucket: str, **kwargs: Any) None#

Write the Point instance to InfluxDB.

Parameters#

  • write_api (influxdb_client.WriteApiAsync): The InfluxDB WriteApiAsync instance.

  • bucket (str): The target bucket in InfluxDB.

  • **kwargs (Any): Additional keyword arguments to pass to the WriteApiAsync.write() method.

classmethod query(query_api: influxdb_client.QueryApi, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#

Query the database for all points of this type based on filters.

Parameters#

  • query_api (influxdb_client.QueryApi): The InfluxDB QueryApi instance.

  • bucket (str): The target bucket in InfluxDB.

  • output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of Self, “dataframe” returns a pandas DataFrame (default is “list”).

  • filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter {"name": ["a", "b"], "color": "blue"} corresponds to (name == "a" or name == "b") and (color == "blue").

  • limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.

  • start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).

  • stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.

  • sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.

Returns#

  • (list[Self] | pd.DataFrame): A collection of Self instances or a DataFrame if output="dataframe".

async classmethod query_async(query_api: influxdb_client.client.query_api_async.QueryApiAsync, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#

Query the database for all points of this type based on filters.

Parameters#

  • query_api (influxdb_client.QueryApiAsync): The InfluxDB QueryApiAsync instance.

  • bucket (str): The target bucket in InfluxDB.

  • output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of Self, “dataframe” returns a pandas DataFrame (default is “list”).

  • filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter {"name": ["a", "b"], "color": "blue"} corresponds to (name == "a" or name == "b") and (color == "blue").

  • limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.

  • start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).

  • stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.

  • sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.

Returns#

  • (list[Self] | pd.DataFrame): A collection of Self instances or a DataFrame if output="dataframe".