Skip to content

refactor: rest client#217

Open
NguyenHoangSon96 wants to merge 2 commits into
mainfrom
refactor/rest-client
Open

refactor: rest client#217
NguyenHoangSon96 wants to merge 2 commits into
mainfrom
refactor/rest-client

Conversation

@NguyenHoangSon96

Copy link
Copy Markdown
Contributor

Closes #

Proposed Changes

Briefly describe your proposed changes:

Checklist

  • CHANGELOG.md updated
  • Rebased/mergeable
  • A test has been added if appropriate
  • Tests pass
  • Commit messages are conventional
  • Sign CLA (if not already signed)

@NguyenHoangSon96 NguyenHoangSon96 self-assigned this Jun 12, 2026
Copilot AI review requested due to automatic review settings June 12, 2026 15:16

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the write path to use a new synchronous RestClient (urllib3-based) instead of the previous generated client/write service wiring, and updates InfluxDBClient3 construction accordingly.

Changes:

  • Refactors WriteApi to issue HTTP requests via a new RestClient, adds gzip decision/compression utilities, and adds endpoint/exception translation logic.
  • Introduces influxdb_client_3/write_client/_sync/rest_client.py to encapsulate urllib3 request/response handling.
  • Updates InfluxDBClient3 initialization to pass write connection details (base_url/auth/gzip) directly into WriteApi.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 15 comments.

File Description
influxdb_client_3/write_client/client/write_api.py Major refactor of write implementation to use a new REST client and custom request building/serialization paths.
influxdb_client_3/write_client/_sync/rest_client.py Adds a new urllib3-based synchronous REST client abstraction used by WriteApi.
influxdb_client_3/__init__.py Wires new write API constructor parameters (auth/gzip/base_url) and updates public kwargs documentation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +337 to +347
# TODO should allow passing auth schema???

self.rest_client = rest_client

self.token = token
self.bucket = bucket
self.org = org
self.enable_gzip = enable_gzip
self.gzip_threshold = gzip_threshold
self.auth_scheme = auth_scheme
self.timeout = timeout
Comment on lines 693 to +709
def _post_write(self, _async_req, bucket, org, body, precision, no_sync, accept_partial, use_v2_api, **kwargs):
# Filter out serializer-specific kwargs before passing to _post_write
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
no_sync=no_sync,
accept_partial=accept_partial,
use_v2_api=use_v2_api,
async_req=_async_req,
content_type="text/plain; charset=utf-8",
**http_kwargs)

def _to_response(self, data: _BatchItem, delay: timedelta):
# TODO refactor this
http_kwargs['precision']=precision
http_kwargs['no_sync']=no_sync
http_kwargs['accept_partial']=accept_partial
http_kwargs['use_v2_api']=use_v2_api

return rx.of(data).pipe(
ops.subscribe_on(self._write_options.write_scheduler),
# use delay if its specified
ops.delay(duetime=delay, scheduler=self._write_options.write_scheduler),
# invoke http call
ops.map(lambda x: self._http(x, **x.key.kwargs)),
# catch exception to fail batch response
ops.catch(handler=lambda exception, source: rx.just(_BatchResponse(exception=exception, data=data))),
)
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
thread = self.post_write_with_http_info(org, bucket, body, **http_kwargs) # noqa: E501
return thread
else:
(data) = self.post_write_with_http_info(org, bucket, body, **http_kwargs) # noqa: E501
return data
Comment thread influxdb_client_3/write_client/client/write_api.py Outdated
Comment on lines +834 to +837
if enable_gzip is not False:
if gzip_threshold is not None:
payload_size = len(payload.encode('utf-8'))
return payload_size >= gzip_threshold
Comment on lines 782 to 800
def __getstate__(self):
"""Return a dict of attributes that you want to pickle."""
state = self.__dict__.copy()
# Remove rx
del state['_subject']
del state['_disposable']
del state['_write_service']
return state

def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Rx
self.__init__(self._influxdb_client,
self._write_options,
self._point_settings,
success_callback=self._success_callback,
error_callback=self._error_callback,
retry_callback=self._retry_callback)
Comment on lines +193 to +197
def __setstate__(self, state):
"""Set your object with the provided dict."""
self.__dict__.update(state)
# Init Pool manager
self.__init__(self.pools_size, self.maxsize, self.retries)
Comment on lines +235 to +236
:key str enable_gzip: TODO ???
:key str gzip_threshold: TODO ???
Comment on lines +1167 to +1176
try:
return await self.call_api(
resource_path=path,
method='POST',
query_params=query_params,
header_params=header_params,
body=body,
async_req=local_var_params.get('async_req'),
_request_timeout=local_var_params.get('_request_timeout'),
urlopen_kw=kwargs.get('urlopen_kw', None))
Comment on lines +802 to +807
@property
def pool(self):
"""Create thread pool on first request avoids instantiating unused threadpool for blocking clients."""
if self._pool is None:
self._pool = ThreadPool(self.pool_threads)
return self._pool
Comment on lines +744 to +748
elif _HAS_DATACLASS and dataclasses.is_dataclass(record):
self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs)
elif isinstance(record, Iterable):
for item in record:
self._serialize(item, write_precision, payload, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants