orangeqs.juice.database._point#
Base class for time-series data points.
Module Contents#
Classes#
Base class for time-series data points used to store and query data from InfluxDB. |
API#
- class orangeqs.juice.database._point.Point(/, **data: Any)#
Bases:
pydantic.BaseModelBase class for time-series data points used to store and query data from InfluxDB.
Subclassing this class defines a schema for a specific measurement. This schema class can be used to store and query data from InfluxDB. The schema is based on a
pydantic.BaseModel, so all pydantic features are available.This base class follows two additional conventions:
By default, all fields are stored as InfluxDB fields. To mark a field as a tag, annotate the type using the string
"tag", e.g.field: typing.Annotated[str, "tag"].Every subclass must define a
measurementclass variable, which specifies the InfluxDB measurement name, e.g.measurement: typing.ClassVar[str] = "some_name".
Also see schema design best practices in the InfluxDB documentation, especially regarding the use of tags vs. fields.
Examples#
This example shows:
How to define a schema for a specific measurement.
How to store a record.
How to query it from the database.
from typing import Annotated, ClassVar from datetime import datetime from orangeqs.juice.database import Point # Define a schema class SomeEvent(Point): measurement: ClassVar[str] = "some_event" event_id: Annotated[str, "tag"] value: float # Writing the event to InfluxDB event = SomeEvent(event_id="event123", value=42.0) event.write(write_api, bucket="my_bucket") # Querying events from InfluxDB events = SomeEvent.query( query_api, bucket="my_bucket", filters={"event_id": "event123"}, limit=10 )
- measurement: ClassVar[str]#
None
Name of the InfluxDB measurement. Must be unique and defined in every subclass.
- time: datetime.datetime#
‘Field(…)’
The timestamp of the data point. Defaults to the current time in UTC.
- to_influxdb2() influxdb_client.Point#
Convert the Point instance to an InfluxDB Point.
Returns#
(influxdb_client.Point): The corresponding InfluxDB Point object.
- write(write_api: influxdb_client.WriteApi, bucket: str, **kwargs: Any) None#
Write the Point instance to InfluxDB.
Parameters#
write_api (influxdb_client.WriteApi): The InfluxDB WriteApi instance.
bucket (str): The target bucket in InfluxDB.
**kwargs (Any): Additional keyword arguments to pass to the
WriteApi.write()method.
- async write_async(write_api: influxdb_client.client.write_api_async.WriteApiAsync, bucket: str, **kwargs: Any) None#
Write the Point instance to InfluxDB.
Parameters#
write_api (influxdb_client.WriteApiAsync): The InfluxDB WriteApiAsync instance.
bucket (str): The target bucket in InfluxDB.
**kwargs (Any): Additional keyword arguments to pass to the
WriteApiAsync.write()method.
- classmethod query(query_api: influxdb_client.QueryApi, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#
Query the database for all points of this type based on filters.
Parameters#
query_api (influxdb_client.QueryApi): The InfluxDB QueryApi instance.
bucket (str): The target bucket in InfluxDB.
output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of
Self, “dataframe” returns a pandas DataFrame (default is “list”).filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter
{"name": ["a", "b"], "color": "blue"}corresponds to(name == "a" or name == "b") and (color == "blue").limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.
start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).
stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.
sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.
Returns#
(list[Self] | pd.DataFrame): A collection of
Selfinstances or a DataFrame ifoutput="dataframe".
- async classmethod query_async(query_api: influxdb_client.client.query_api_async.QueryApiAsync, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#
Query the database for all points of this type based on filters.
Parameters#
query_api (influxdb_client.QueryApiAsync): The InfluxDB QueryApiAsync instance.
bucket (str): The target bucket in InfluxDB.
output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of
Self, “dataframe” returns a pandas DataFrame (default is “list”).filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter
{"name": ["a", "b"], "color": "blue"}corresponds to(name == "a" or name == "b") and (color == "blue").limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.
start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).
stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.
sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.
Returns#
(list[Self] | pd.DataFrame): A collection of
Selfinstances or a DataFrame ifoutput="dataframe".