refactor: rest client#217
Open
NguyenHoangSon96 wants to merge 2 commits into
Open
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the write path to use a new synchronous RestClient (urllib3-based) instead of the previous generated client/write service wiring, and updates InfluxDBClient3 construction accordingly.
Changes:
- Refactors
WriteApito issue HTTP requests via a newRestClient, adds gzip decision/compression utilities, and adds endpoint/exception translation logic. - Introduces
influxdb_client_3/write_client/_sync/rest_client.pyto encapsulate urllib3 request/response handling. - Updates
InfluxDBClient3initialization to pass write connection details (base_url/auth/gzip) directly intoWriteApi.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 15 comments.
| File | Description |
|---|---|
influxdb_client_3/write_client/client/write_api.py |
Major refactor of write implementation to use a new REST client and custom request building/serialization paths. |
influxdb_client_3/write_client/_sync/rest_client.py |
Adds a new urllib3-based synchronous REST client abstraction used by WriteApi. |
influxdb_client_3/__init__.py |
Wires new write API constructor parameters (auth/gzip/base_url) and updates public kwargs documentation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+337
to
+347
| # TODO should allow passing auth schema??? | ||
|
|
||
| self.rest_client = rest_client | ||
|
|
||
| self.token = token | ||
| self.bucket = bucket | ||
| self.org = org | ||
| self.enable_gzip = enable_gzip | ||
| self.gzip_threshold = gzip_threshold | ||
| self.auth_scheme = auth_scheme | ||
| self.timeout = timeout |
Comment on lines
693
to
+709
| def _post_write(self, _async_req, bucket, org, body, precision, no_sync, accept_partial, use_v2_api, **kwargs): | ||
| # Filter out serializer-specific kwargs before passing to _post_write | ||
| http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} | ||
| return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision, | ||
| no_sync=no_sync, | ||
| accept_partial=accept_partial, | ||
| use_v2_api=use_v2_api, | ||
| async_req=_async_req, | ||
| content_type="text/plain; charset=utf-8", | ||
| **http_kwargs) | ||
|
|
||
| def _to_response(self, data: _BatchItem, delay: timedelta): | ||
| # TODO refactor this | ||
| http_kwargs['precision']=precision | ||
| http_kwargs['no_sync']=no_sync | ||
| http_kwargs['accept_partial']=accept_partial | ||
| http_kwargs['use_v2_api']=use_v2_api | ||
|
|
||
| return rx.of(data).pipe( | ||
| ops.subscribe_on(self._write_options.write_scheduler), | ||
| # use delay if its specified | ||
| ops.delay(duetime=delay, scheduler=self._write_options.write_scheduler), | ||
| # invoke http call | ||
| ops.map(lambda x: self._http(x, **x.key.kwargs)), | ||
| # catch exception to fail batch response | ||
| ops.catch(handler=lambda exception, source: rx.just(_BatchResponse(exception=exception, data=data))), | ||
| ) | ||
| kwargs['_return_http_data_only'] = True | ||
| if kwargs.get('async_req'): | ||
| thread = self.post_write_with_http_info(org, bucket, body, **http_kwargs) # noqa: E501 | ||
| return thread | ||
| else: | ||
| (data) = self.post_write_with_http_info(org, bucket, body, **http_kwargs) # noqa: E501 | ||
| return data |
Comment on lines
+834
to
+837
| if enable_gzip is not False: | ||
| if gzip_threshold is not None: | ||
| payload_size = len(payload.encode('utf-8')) | ||
| return payload_size >= gzip_threshold |
Comment on lines
782
to
800
| def __getstate__(self): | ||
| """Return a dict of attributes that you want to pickle.""" | ||
| state = self.__dict__.copy() | ||
| # Remove rx | ||
| del state['_subject'] | ||
| del state['_disposable'] | ||
| del state['_write_service'] | ||
| return state | ||
|
|
||
| def __setstate__(self, state): | ||
| """Set your object with the provided dict.""" | ||
| self.__dict__.update(state) | ||
| # Init Rx | ||
| self.__init__(self._influxdb_client, | ||
| self._write_options, | ||
| self._point_settings, | ||
| success_callback=self._success_callback, | ||
| error_callback=self._error_callback, | ||
| retry_callback=self._retry_callback) |
Comment on lines
+193
to
+197
| def __setstate__(self, state): | ||
| """Set your object with the provided dict.""" | ||
| self.__dict__.update(state) | ||
| # Init Pool manager | ||
| self.__init__(self.pools_size, self.maxsize, self.retries) |
Comment on lines
+235
to
+236
| :key str enable_gzip: TODO ??? | ||
| :key str gzip_threshold: TODO ??? |
Comment on lines
+1167
to
+1176
| try: | ||
| return await self.call_api( | ||
| resource_path=path, | ||
| method='POST', | ||
| query_params=query_params, | ||
| header_params=header_params, | ||
| body=body, | ||
| async_req=local_var_params.get('async_req'), | ||
| _request_timeout=local_var_params.get('_request_timeout'), | ||
| urlopen_kw=kwargs.get('urlopen_kw', None)) |
Comment on lines
+802
to
+807
| @property | ||
| def pool(self): | ||
| """Create thread pool on first request avoids instantiating unused threadpool for blocking clients.""" | ||
| if self._pool is None: | ||
| self._pool = ThreadPool(self.pool_threads) | ||
| return self._pool |
Comment on lines
+744
to
+748
| elif _HAS_DATACLASS and dataclasses.is_dataclass(record): | ||
| self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs) | ||
| elif isinstance(record, Iterable): | ||
| for item in record: | ||
| self._serialize(item, write_precision, payload, **kwargs) |
2b26e5c to
7d74438
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #
Proposed Changes
Briefly describe your proposed changes:
Checklist