Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ commands:
curl -Os https://uploader.codecov.io/latest/linux/codecov
curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM
curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM.sig
curl -s https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
curl -fsSL https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
shasum -a 256 -c codecov.SHA256SUM
chmod +x ./codecov
Expand All @@ -81,10 +81,10 @@ commands:
- run:
name: Collecting coverage reports
command: |
curl -k https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM.sig
curl -k https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
shasum -a 256 -c codecov.SHA256SUM
sudo chmod +x codecov
Expand Down
73 changes: 48 additions & 25 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import importlib.util
import json
import os
import urllib.parse
from typing import Any, List, Literal, Optional, TYPE_CHECKING

import pyarrow as pa

from influxdb_client_3.version import USER_AGENT
from influxdb_client_3.write_client._sync import rest_client as rest

if TYPE_CHECKING:
import pandas as pd
import polars as pl
Expand All @@ -14,7 +18,7 @@
from influxdb_client_3.exceptions import InfluxDBError
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client import WriteOptions, Point
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings, DefaultWriteOptions, WriteType
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
Expand Down Expand Up @@ -189,6 +193,9 @@ def __init__(
org=None,
database=None,
token=None,
auth_scheme=None,
enable_gzip=False,
gzip_threshold=None,
write_client_options=None,
flight_client_options=None,
write_port_overwrite=None,
Expand Down Expand Up @@ -229,6 +236,8 @@ def __init__(
:key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests
except batching writes. As a default there is no one retry strategy.
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
:key str enable_gzip: TODO ???
:key str gzip_threshold: TODO ???
Comment thread
NguyenHoangSon96 marked this conversation as resolved.
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
:key bool write_accept_partial: allow partial writes when some lines fail.
:key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
Expand Down Expand Up @@ -293,14 +302,36 @@ def __init__(
if write_port_overwrite is not None:
port = write_port_overwrite

self._client = _InfluxDBClient(
url=f"{scheme}://{hostname}:{port}",
# TODO fix retries
retries = None

auth_schema = 'Token' if auth_scheme is None else auth_scheme
default_header = {
'Authorization': f'{auth_schema} {self._token}',
'User-Agent': USER_AGENT
}
self.base_url = f"{scheme}://{hostname}:{port}"
self.default_header = default_header
self.rest_client = rest.RestClient(
base_url=self.base_url,
default_header=default_header,
retries=retries)

# TODO point_settings??
# TODO enable_gzip and gzip_threshold be in WriteOptions

self._write_api = _WriteApi(
token=self._token,
bucket=self._database,
org=self._org,
gzip_threshold=gzip_threshold,
enable_gzip=enable_gzip,
auth_scheme=auth_scheme,
timeout=write_timeout,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
default_header=default_header,
rest_client=self.rest_client,
**self._write_client_options
)

if query_port_overwrite is not None:
port = query_port_overwrite
Expand Down Expand Up @@ -658,32 +689,25 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

def get_server_version(self) -> str:
def get_server_version(self) -> Any | None:
"""
Get the version of the connected InfluxDB server.
Get the influxdb_version of the connected InfluxDB server.

This method makes a ping request to the server and extracts the version information
This method makes a ping request to the server and extracts the influxdb_version information
from either the response headers or response body.

:return: The version string of the InfluxDB server.
:return: The influxdb_version string of the InfluxDB server.
:rtype: str
"""
version = None
(resp_body, _, header) = self._client.api_client.call_api(
resource_path="/ping",
method="GET",
response_type=object
)

for key, value in header.items():
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
for key, value in resp.getheaders().items():
if key.lower() == "x-influxdb-version":
version = value
break

if version is None and isinstance(resp_body, dict):
version = resp_body['version']
return value

return version
string_body = resp.get_string_body()
if string_body is not None:
return json.loads(string_body)['version']
return None

def flush(self):
"""
Expand All @@ -702,7 +726,6 @@ def close(self):
"""Close the client and clean up resources."""
self._write_api.close()
self._query_api.close()
self._client.close()

def __enter__(self):
return self
Expand Down
10 changes: 2 additions & 8 deletions influxdb_client_3/write_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@

from __future__ import absolute_import

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
from influxdb_client_3.version import VERSION
from influxdb_client_3.write_client.client.write.point import Point

from influxdb_client_3.write_client.service.write_service import WriteService

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.domain.write_precision import WritePrecision

from influxdb_client_3.write_client.configuration import Configuration
from influxdb_client_3.version import VERSION
__version__ = VERSION
Loading
Loading