diff --git a/README.md b/README.md index de53ff0..e42f34b 100644 --- a/README.md +++ b/README.md @@ -516,7 +516,13 @@ Use a local Tinybird container for development without affecting cloud workspace ### Connections ```python -from tinybird_sdk import define_gcs_connection, define_kafka_connection, define_s3_connection, secret +from tinybird_sdk import ( + define_dynamodb_connection, + define_gcs_connection, + define_kafka_connection, + define_s3_connection, + secret, +) events_kafka = define_kafka_connection( "events_kafka", @@ -543,6 +549,42 @@ landing_gcs = define_gcs_connection( "service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"), }, ) + +events_dynamodb = define_dynamodb_connection( + "events_dynamodb", + { + "region": "us-east-1", + "arn": secret("DYNAMODB_ROLE_ARN"), + }, +) +``` + +The DynamoDB connector uses an IAM role to authenticate. Reference the connection from +a datasource to import a DynamoDB table: + +```python +from tinybird_sdk import column, define_datasource, engine, t + +orders = define_datasource( + "orders", + { + "schema": { + "id": column(t.string(), {"json_path": "$.Item.id"}), + "_record": column(t.string(), {"json_path": "$.NewImage"}), + "_timestamp": column(t.date_time64(3), {"json_path": "$.ApproximateCreationDateTime"}), + "_event_name": column(t.string().low_cardinality(), {"json_path": "$.eventName"}), + "_is_deleted": column(t.uint8(), {"json_path": "$._is_deleted"}), + }, + "engine": engine.replacing_merge_tree( + {"sorting_key": ["id"], "ver": "_timestamp", "is_deleted": "_is_deleted"} + ), + "dynamodb": { + "connection": events_dynamodb, + "table_arn": "arn:aws:dynamodb:us-east-1:123456789012:table/orders", + "export_bucket": "s3://my-tinybird-dynamodb-exports", + }, + }, +) ``` ### Datasources diff --git a/src/tinybird_sdk/__init__.py b/src/tinybird_sdk/__init__.py index c4ac2a2..11a83c6 100644 --- a/src/tinybird_sdk/__init__.py +++ b/src/tinybird_sdk/__init__.py @@ -26,11 +26,16 @@ "define_kafka_connection": ("tinybird_sdk.schema", "define_kafka_connection"), "define_s3_connection": ("tinybird_sdk.schema", "define_s3_connection"), "define_gcs_connection": ("tinybird_sdk.schema", "define_gcs_connection"), + "define_dynamodb_connection": ("tinybird_sdk.schema", "define_dynamodb_connection"), "get_connection_type": ("tinybird_sdk.schema", "get_connection_type"), "is_connection_definition": ("tinybird_sdk.schema", "is_connection_definition"), "is_kafka_connection_definition": ("tinybird_sdk.schema", "is_kafka_connection_definition"), "is_s3_connection_definition": ("tinybird_sdk.schema", "is_s3_connection_definition"), "is_gcs_connection_definition": ("tinybird_sdk.schema", "is_gcs_connection_definition"), + "is_dynamodb_connection_definition": ( + "tinybird_sdk.schema", + "is_dynamodb_connection_definition", + ), "secret": ("tinybird_sdk.schema", "secret"), "define_token": ("tinybird_sdk.schema", "define_token"), "is_token_definition": ("tinybird_sdk.schema", "is_token_definition"), diff --git a/src/tinybird_sdk/generator/connection.py b/src/tinybird_sdk/generator/connection.py index 06e7483..c40377f 100644 --- a/src/tinybird_sdk/generator/connection.py +++ b/src/tinybird_sdk/generator/connection.py @@ -4,6 +4,7 @@ from ..schema.connection import ( ConnectionDefinition, + DynamoDBConnectionDefinition, GCSConnectionDefinition, KafkaConnectionDefinition, S3ConnectionDefinition, @@ -70,6 +71,17 @@ def _generate_gcs_connection(connection: GCSConnectionDefinition) -> str: return "\n".join(parts) +def _generate_dynamodb_connection(connection: DynamoDBConnectionDefinition) -> str: + options = connection.options + parts = [ + "TYPE dynamodb", + f"DYNAMODB_ARN {options.arn}", + f"DYNAMODB_REGION {options.region}", + ] + + return "\n".join(parts) + + def generate_connection(connection: ConnectionDefinition) -> GeneratedConnection: if isinstance(connection, KafkaConnectionDefinition): return GeneratedConnection( @@ -83,6 +95,10 @@ def generate_connection(connection: ConnectionDefinition) -> GeneratedConnection return GeneratedConnection( name=connection._name, content=_generate_gcs_connection(connection) ) + if isinstance(connection, DynamoDBConnectionDefinition): + return GeneratedConnection( + name=connection._name, content=_generate_dynamodb_connection(connection) + ) raise ValueError(f"Unsupported connection type: {connection._connectionType}") diff --git a/src/tinybird_sdk/generator/datasource.py b/src/tinybird_sdk/generator/datasource.py index d3e7505..fe504f1 100644 --- a/src/tinybird_sdk/generator/datasource.py +++ b/src/tinybird_sdk/generator/datasource.py @@ -124,6 +124,15 @@ def _generate_import_config(import_config: Any) -> str: return "\n".join(lines) +def _generate_dynamodb_config(dynamodb: Any) -> str: + lines = [ + f"IMPORT_CONNECTION_NAME {dynamodb.connection._name}", + f"IMPORT_TABLE_ARN {dynamodb.table_arn}", + f"IMPORT_EXPORT_BUCKET {dynamodb.export_bucket}", + ] + return "\n".join(lines) + + def _generate_forward_query(forward_query: str | None) -> str | None: if not forward_query or not forward_query.strip(): return None @@ -193,6 +202,9 @@ def generate_datasource(datasource: DatasourceDefinition) -> GeneratedDatasource if datasource.options.gcs: parts.extend(["", _generate_import_config(datasource.options.gcs)]) + if datasource.options.dynamodb: + parts.extend(["", _generate_dynamodb_config(datasource.options.dynamodb)]) + forward_query = _generate_forward_query(datasource.options.forward_query) if forward_query: parts.extend(["", forward_query]) diff --git a/src/tinybird_sdk/migrate/emit_ts.py b/src/tinybird_sdk/migrate/emit_ts.py index 55a2f67..8cde5d1 100644 --- a/src/tinybird_sdk/migrate/emit_ts.py +++ b/src/tinybird_sdk/migrate/emit_ts.py @@ -8,6 +8,7 @@ from .parser_utils import parse_literal_from_datafile from .types import ( DatasourceModel, + DynamoDBConnectionModel, GCSConnectionModel, KafkaConnectionModel, ParsedResource, @@ -261,6 +262,14 @@ def _emit_datasource(ds: DatasourceModel) -> str: lines.append(f" 'from_timestamp': {_escape_string(ds.gcs.from_timestamp)},") lines.append(" },") + if ds.dynamodb: + connection_var = to_snake_case(ds.dynamodb.connection_name) + lines.append(" 'dynamodb': {") + lines.append(f" 'connection': {connection_var},") + lines.append(f" 'table_arn': {_escape_string(ds.dynamodb.table_arn)},") + lines.append(f" 'export_bucket': {_escape_string(ds.dynamodb.export_bucket)},") + lines.append(" },") + if ds.forward_query: lines.append(" 'forward_query': '''") lines.append(ds.forward_query) @@ -336,13 +345,31 @@ def _emit_gcs_connection(connection: GCSConnectionModel) -> str: return "\n".join(lines) +def _emit_dynamodb_connection(connection: DynamoDBConnectionModel) -> str: + variable_name = to_snake_case(connection.name) + lines: list[str] = [] + lines.append( + f"{variable_name} = define_dynamodb_connection({_escape_string(connection.name)}, {{" + ) + lines.append(f" 'region': {_escape_string(connection.region)},") + lines.append(f" 'arn': {_escape_string(connection.arn)},") + lines.append("})") + lines.append("") + return "\n".join(lines) + + def _emit_connection( - connection: KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel, + connection: KafkaConnectionModel + | S3ConnectionModel + | GCSConnectionModel + | DynamoDBConnectionModel, ) -> str: if isinstance(connection, S3ConnectionModel): return _emit_s3_connection(connection) if isinstance(connection, GCSConnectionModel): return _emit_gcs_connection(connection) + if isinstance(connection, DynamoDBConnectionModel): + return _emit_dynamodb_connection(connection) return _emit_kafka_connection(connection) @@ -474,6 +501,8 @@ def emit_migration_file_content(resources: list[ParsedResource]) -> str: imports.add("define_s3_connection") elif isinstance(conn, GCSConnectionModel): imports.add("define_gcs_connection") + elif isinstance(conn, DynamoDBConnectionModel): + imports.add("define_dynamodb_connection") if needs_column: imports.add("column") if needs_params: diff --git a/src/tinybird_sdk/migrate/parse_connection.py b/src/tinybird_sdk/migrate/parse_connection.py index 0a932b1..d66396b 100644 --- a/src/tinybird_sdk/migrate/parse_connection.py +++ b/src/tinybird_sdk/migrate/parse_connection.py @@ -10,6 +10,7 @@ ) from .types import ( ConnectionModel, + DynamoDBConnectionModel, GCSConnectionModel, KafkaConnectionModel, ResourceFile, @@ -30,6 +31,8 @@ "S3_ACCESS_KEY", "S3_SECRET", "GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON", + "DYNAMODB_ARN", + "DYNAMODB_REGION", } @@ -58,6 +61,9 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel: access_secret: str | None = None service_account_credentials_json: str | None = None + dynamodb_arn: str | None = None + dynamodb_region: str | None = None + i = 0 while i < len(lines): raw_line = lines[i] @@ -119,6 +125,10 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel: access_secret = parse_quoted_value(value) elif name == "GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON": service_account_credentials_json = parse_quoted_value(value) + elif name == "DYNAMODB_ARN": + dynamodb_arn = parse_quoted_value(value) + elif name == "DYNAMODB_REGION": + dynamodb_region = parse_quoted_value(value) else: raise MigrationParseError( resource.file_path, @@ -135,12 +145,20 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel: ) if connection_type == "kafka": - if region or arn or access_key or access_secret or service_account_credentials_json: + if ( + region + or arn + or access_key + or access_secret + or service_account_credentials_json + or dynamodb_arn + or dynamodb_region + ): raise MigrationParseError( resource.file_path, "connection", resource.name, - "S3/GCS directives are not valid for kafka connections.", + "S3/GCS/DynamoDB directives are not valid for kafka connections.", ) if not bootstrap_servers: @@ -175,12 +193,14 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel: or schema_registry_url or ssl_ca_pem or service_account_credentials_json + or dynamodb_arn + or dynamodb_region ): raise MigrationParseError( resource.file_path, "connection", resource.name, - "Kafka/GCS directives are not valid for s3 connections.", + "Kafka/GCS/DynamoDB directives are not valid for s3 connections.", ) if not region: @@ -231,12 +251,14 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel: or arn or access_key or access_secret + or dynamodb_arn + or dynamodb_region ): raise MigrationParseError( resource.file_path, "connection", resource.name, - "Kafka/S3 directives are not valid for gcs connections.", + "Kafka/S3/DynamoDB directives are not valid for gcs connections.", ) if not service_account_credentials_json: @@ -255,6 +277,53 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel: service_account_credentials_json=service_account_credentials_json, ) + if connection_type == "dynamodb": + if ( + bootstrap_servers + or security_protocol + or sasl_mechanism + or key + or secret + or schema_registry_url + or ssl_ca_pem + or region + or arn + or access_key + or access_secret + or service_account_credentials_json + ): + raise MigrationParseError( + resource.file_path, + "connection", + resource.name, + "Kafka/S3/GCS directives are not valid for dynamodb connections.", + ) + + if not dynamodb_arn: + raise MigrationParseError( + resource.file_path, + "connection", + resource.name, + "DYNAMODB_ARN is required for dynamodb connections.", + ) + + if not dynamodb_region: + raise MigrationParseError( + resource.file_path, + "connection", + resource.name, + "DYNAMODB_REGION is required for dynamodb connections.", + ) + + return DynamoDBConnectionModel( + kind="connection", + name=resource.name, + file_path=resource.file_path, + connection_type="dynamodb", + region=dynamodb_region, + arn=dynamodb_arn, + ) + raise MigrationParseError( resource.file_path, "connection", diff --git a/src/tinybird_sdk/migrate/parse_datasource.py b/src/tinybird_sdk/migrate/parse_datasource.py index 41ee768..53164e3 100644 --- a/src/tinybird_sdk/migrate/parse_datasource.py +++ b/src/tinybird_sdk/migrate/parse_datasource.py @@ -14,6 +14,7 @@ ) from .types import ( DatasourceColumnModel, + DatasourceDynamoDBModel, DatasourceEngineModel, DatasourceGCSModel, DatasourceIndexModel, @@ -281,6 +282,8 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_bucket_uri: str | None = None import_schedule: str | None = None import_from_timestamp: str | None = None + import_table_arn: str | None = None + import_export_bucket: str | None = None i = 0 while i < len(lines): @@ -413,6 +416,10 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_schedule = parse_quoted_value(value) elif key == "IMPORT_FROM_TIMESTAMP": import_from_timestamp = parse_quoted_value(value) + elif key == "IMPORT_TABLE_ARN": + import_table_arn = parse_quoted_value(value) + elif key == "IMPORT_EXPORT_BUCKET": + import_export_bucket = parse_quoted_value(value) elif key == "BACKFILL": normalized = value.strip().lower() if normalized != "skip": @@ -486,8 +493,32 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: store_raw_value=kafka_store_raw_value, ) + dynamodb: DatasourceDynamoDBModel | None = None + if import_table_arn or import_export_bucket: + if not import_connection_name or not import_table_arn or not import_export_bucket: + raise MigrationParseError( + resource.file_path, + "datasource", + resource.name, + "IMPORT_CONNECTION_NAME, IMPORT_TABLE_ARN and IMPORT_EXPORT_BUCKET are required for DynamoDB imports.", + ) + if import_bucket_uri or import_schedule or import_from_timestamp: + raise MigrationParseError( + resource.file_path, + "datasource", + resource.name, + "DynamoDB imports cannot be combined with S3/GCS import directives.", + ) + dynamodb = DatasourceDynamoDBModel( + connection_name=import_connection_name, + table_arn=import_table_arn, + export_bucket=import_export_bucket, + ) + imported: DatasourceS3Model | None = None - if import_connection_name or import_bucket_uri or import_schedule or import_from_timestamp: + if not dynamodb and ( + import_connection_name or import_bucket_uri or import_schedule or import_from_timestamp + ): if not import_connection_name or not import_bucket_uri: raise MigrationParseError( resource.file_path, @@ -502,7 +533,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: from_timestamp=import_from_timestamp, ) - if kafka and imported: + if kafka and (imported or dynamodb): raise MigrationParseError( resource.file_path, "datasource", @@ -536,6 +567,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: kafka=kafka, s3=imported, gcs=None, + dynamodb=dynamodb, forward_query=forward_query, tokens=tokens, shared_with=shared_with, diff --git a/src/tinybird_sdk/migrate/runner.py b/src/tinybird_sdk/migrate/runner.py index 5783997..c24c4bc 100644 --- a/src/tinybird_sdk/migrate/runner.py +++ b/src/tinybird_sdk/migrate/runner.py @@ -120,6 +120,8 @@ def run_migrate(options: MigrateOptions | dict[str, Any]) -> MigrationResult: if datasource.s3 else datasource.gcs.connection_name if datasource.gcs + else datasource.dynamodb.connection_name + if datasource.dynamodb else None ) @@ -191,6 +193,24 @@ def run_migrate(options: MigrateOptions | dict[str, Any]) -> MigrationResult: gcs=None, ) + if datasource.dynamodb: + dynamodb_connection_type = parsed_connection_type_by_name.get( + datasource.dynamodb.connection_name + ) + if dynamodb_connection_type != "dynamodb": + errors.append( + MigrationError( + file_path=datasource.file_path, + resource_name=datasource.name, + resource_kind=datasource.kind, + message=( + "Datasource dynamodb ingestion requires a dynamodb connection, found " + f'"{dynamodb_connection_type or "(none)"}".' + ), + ) + ) + continue + try: validate_resource_for_emission(normalized_datasource) migrated.append(normalized_datasource) diff --git a/src/tinybird_sdk/migrate/types.py b/src/tinybird_sdk/migrate/types.py index 3ebbfd1..7363bdb 100644 --- a/src/tinybird_sdk/migrate/types.py +++ b/src/tinybird_sdk/migrate/types.py @@ -73,6 +73,13 @@ class DatasourceGCSModel: from_timestamp: str | None = None +@dataclass(frozen=True, slots=True) +class DatasourceDynamoDBModel: + connection_name: str + table_arn: str + export_bucket: str + + @dataclass(frozen=True, slots=True) class DatasourceTokenModel: name: str @@ -100,6 +107,7 @@ class DatasourceModel: kafka: DatasourceKafkaModel | None = None s3: DatasourceS3Model | None = None gcs: DatasourceGCSModel | None = None + dynamodb: DatasourceDynamoDBModel | None = None forward_query: str | None = None tokens: list[DatasourceTokenModel] = field(default_factory=list) shared_with: list[str] = field(default_factory=list) @@ -209,10 +217,27 @@ class GCSConnectionModel: service_account_credentials_json: str -ConnectionModel = KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel +@dataclass(frozen=True, slots=True) +class DynamoDBConnectionModel: + kind: Literal["connection"] + name: str + file_path: str + connection_type: Literal["dynamodb"] + region: str + arn: str + + +ConnectionModel = ( + KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel | DynamoDBConnectionModel +) ParsedResource = ( - DatasourceModel | PipeModel | KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel + DatasourceModel + | PipeModel + | KafkaConnectionModel + | S3ConnectionModel + | GCSConnectionModel + | DynamoDBConnectionModel ) diff --git a/src/tinybird_sdk/schema/__init__.py b/src/tinybird_sdk/schema/__init__.py index c3bf614..fa2ad9f 100644 --- a/src/tinybird_sdk/schema/__init__.py +++ b/src/tinybird_sdk/schema/__init__.py @@ -36,10 +36,12 @@ define_kafka_connection, define_s3_connection, define_gcs_connection, + define_dynamodb_connection, is_connection_definition, is_kafka_connection_definition, is_s3_connection_definition, is_gcs_connection_definition, + is_dynamodb_connection_definition, get_connection_type, ConnectionDefinition, KafkaConnectionDefinition, @@ -48,6 +50,8 @@ S3ConnectionOptions, GCSConnectionDefinition, GCSConnectionOptions, + DynamoDBConnectionDefinition, + DynamoDBConnectionOptions, ) from .secret import secret from .token import define_token, is_token_definition, TokenDefinition @@ -109,6 +113,7 @@ "define_kafka_connection", "define_s3_connection", "define_gcs_connection", + "define_dynamodb_connection", "define_token", "secret", "sql", diff --git a/src/tinybird_sdk/schema/connection.py b/src/tinybird_sdk/schema/connection.py index 9ab8708..6c7317b 100644 --- a/src/tinybird_sdk/schema/connection.py +++ b/src/tinybird_sdk/schema/connection.py @@ -65,7 +65,26 @@ class GCSConnectionDefinition: _connectionType: str = "gcs" -ConnectionDefinition = KafkaConnectionDefinition | S3ConnectionDefinition | GCSConnectionDefinition +@dataclass(frozen=True, slots=True) +class DynamoDBConnectionOptions: + region: str + arn: str + + +@dataclass(frozen=True, slots=True) +class DynamoDBConnectionDefinition: + _name: str + options: DynamoDBConnectionOptions + _type: str = "connection" + _connectionType: str = "dynamodb" + + +ConnectionDefinition = ( + KafkaConnectionDefinition + | S3ConnectionDefinition + | GCSConnectionDefinition + | DynamoDBConnectionDefinition +) def define_kafka_connection( @@ -113,9 +132,34 @@ def define_gcs_connection( return GCSConnectionDefinition(_name=name, options=normalized) +def define_dynamodb_connection( + name: str, options: dict[str, Any] | DynamoDBConnectionOptions +) -> DynamoDBConnectionDefinition: + _validate_connection_name(name) + normalized = ( + options + if isinstance(options, DynamoDBConnectionOptions) + else DynamoDBConnectionOptions(**options) + ) + + if not normalized.region.strip(): + raise ValueError("DynamoDB connection `region` is required.") + + if not normalized.arn.strip(): + raise ValueError("DynamoDB connection `arn` is required.") + + return DynamoDBConnectionDefinition(_name=name, options=normalized) + + def is_connection_definition(value: Any) -> bool: return isinstance( - value, (KafkaConnectionDefinition, S3ConnectionDefinition, GCSConnectionDefinition) + value, + ( + KafkaConnectionDefinition, + S3ConnectionDefinition, + GCSConnectionDefinition, + DynamoDBConnectionDefinition, + ), ) @@ -131,5 +175,9 @@ def is_gcs_connection_definition(value: Any) -> bool: return isinstance(value, GCSConnectionDefinition) +def is_dynamodb_connection_definition(value: Any) -> bool: + return isinstance(value, DynamoDBConnectionDefinition) + + def get_connection_type(connection: ConnectionDefinition) -> str: return connection._connectionType diff --git a/src/tinybird_sdk/schema/datasource.py b/src/tinybird_sdk/schema/datasource.py index eb951d6..c67a214 100644 --- a/src/tinybird_sdk/schema/datasource.py +++ b/src/tinybird_sdk/schema/datasource.py @@ -4,7 +4,12 @@ from dataclasses import dataclass, field from typing import Any, Literal -from .connection import GCSConnectionDefinition, KafkaConnectionDefinition, S3ConnectionDefinition +from .connection import ( + DynamoDBConnectionDefinition, + GCSConnectionDefinition, + KafkaConnectionDefinition, + S3ConnectionDefinition, +) from .engines import EngineConfig from .token import TokenDefinition from .types import TypeValidator @@ -62,6 +67,13 @@ class GCSConfig: from_timestamp: str | None = None +@dataclass(frozen=True, slots=True) +class DynamoDBConfig: + connection: DynamoDBConnectionDefinition + table_arn: str + export_bucket: str + + @dataclass(frozen=True, slots=True) class DatasourceIndex: name: str @@ -84,6 +96,7 @@ class DatasourceOptions: kafka: KafkaConfig | None = None s3: S3Config | None = None gcs: GCSConfig | None = None + dynamodb: DynamoDBConfig | None = None @dataclass(frozen=True, slots=True) @@ -120,6 +133,10 @@ def define_datasource( s3 = S3Config(**s3_cfg) if isinstance(s3_cfg, dict) else s3_cfg gcs_cfg = options.get("gcs") gcs = GCSConfig(**gcs_cfg) if isinstance(gcs_cfg, dict) else gcs_cfg + dynamodb_cfg = options.get("dynamodb") + dynamodb = ( + DynamoDBConfig(**dynamodb_cfg) if isinstance(dynamodb_cfg, dict) else dynamodb_cfg + ) normalized = DatasourceOptions( description=options.get("description"), schema=options["schema"], @@ -133,14 +150,17 @@ def define_datasource( kafka=kafka, s3=s3, gcs=gcs, + dynamodb=dynamodb, ) ingestion_count = sum( - 1 for x in [normalized.kafka, normalized.s3, normalized.gcs] if x is not None + 1 + for x in [normalized.kafka, normalized.s3, normalized.gcs, normalized.dynamodb] + if x is not None ) if ingestion_count > 1: raise ValueError( - "Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`." + "Datasource can only define one ingestion option: `kafka`, `s3`, `gcs`, or `dynamodb`." ) if normalized.backfill not in {None, "skip"}: diff --git a/src/tinybird_sdk/schema/engines.py b/src/tinybird_sdk/schema/engines.py index 2fe0d35..ef8e3b6 100644 --- a/src/tinybird_sdk/schema/engines.py +++ b/src/tinybird_sdk/schema/engines.py @@ -79,6 +79,9 @@ def get_engine_clause(config: EngineConfig) -> str: if config.type == "ReplacingMergeTree" and config.ver: parts.append(f'ENGINE_VER "{config.ver}"') + if config.type == "ReplacingMergeTree" and config.is_deleted: + parts.append(f'ENGINE_IS_DELETED "{config.is_deleted}"') + if config.type in {"CollapsingMergeTree", "VersionedCollapsingMergeTree"} and config.sign: parts.append(f'ENGINE_SIGN "{config.sign}"') diff --git a/tests/fixtures/parity_contract/root_exports.json b/tests/fixtures/parity_contract/root_exports.json index 2781d89..a6f9903 100644 --- a/tests/fixtures/parity_contract/root_exports.json +++ b/tests/fixtures/parity_contract/root_exports.json @@ -13,6 +13,7 @@ "create_tinybird_api_wrapper", "define_copy_pipe", "define_datasource", + "define_dynamodb_connection", "define_endpoint", "define_gcs_connection", "define_kafka_connection", @@ -52,6 +53,7 @@ "is_connection_definition", "is_copy_pipe", "is_datasource_definition", + "is_dynamodb_connection_definition", "is_gcs_connection_definition", "is_kafka_connection_definition", "is_materialized_view", diff --git a/tests/test_dynamodb_connector.py b/tests/test_dynamodb_connector.py new file mode 100644 index 0000000..aeec154 --- /dev/null +++ b/tests/test_dynamodb_connector.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import pytest + +import tinybird_sdk as sdk +from tinybird_sdk import ( + define_datasource, + define_dynamodb_connection, + define_s3_connection, + is_connection_definition, + is_dynamodb_connection_definition, + t, +) +from tinybird_sdk.generator.connection import generate_connection +from tinybird_sdk.generator.datasource import generate_datasource +from tinybird_sdk.migrate.emit_ts import emit_migration_file_content +from tinybird_sdk.migrate.parse_connection import parse_connection_file +from tinybird_sdk.migrate.parse_datasource import parse_datasource_file +from tinybird_sdk.migrate.types import ResourceFile + + +def _connection() -> object: + return define_dynamodb_connection( + "events_dynamodb", + { + "region": "us-east-1", + "arn": "arn:aws:iam::123456789012:role/tinybird-dynamodb-access", + }, + ) + + +def test_root_exports_include_dynamodb_symbols() -> None: + assert hasattr(sdk, "define_dynamodb_connection") + assert hasattr(sdk, "is_dynamodb_connection_definition") + + +def test_define_dynamodb_connection_metadata() -> None: + connection = _connection() + assert connection._connectionType == "dynamodb" + assert connection._type == "connection" + assert is_connection_definition(connection) + assert is_dynamodb_connection_definition(connection) + assert sdk.get_connection_type(connection) == "dynamodb" + + +def test_define_dynamodb_connection_requires_region_and_arn() -> None: + with pytest.raises(ValueError, match="`region` is required"): + define_dynamodb_connection("c", {"region": " ", "arn": "arn:aws:iam::1:role/r"}) + with pytest.raises(ValueError, match="`arn` is required"): + define_dynamodb_connection("c", {"region": "us-east-1", "arn": ""}) + + +def test_define_dynamodb_connection_validates_name() -> None: + with pytest.raises(ValueError, match="Invalid connection name"): + define_dynamodb_connection("1bad", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"}) + + +def test_generate_dynamodb_connection() -> None: + generated = generate_connection(_connection()) + assert generated.name == "events_dynamodb" + assert generated.content == ( + "TYPE dynamodb\n" + "DYNAMODB_ARN arn:aws:iam::123456789012:role/tinybird-dynamodb-access\n" + "DYNAMODB_REGION us-east-1" + ) + + +def test_generate_datasource_with_dynamodb_import() -> None: + datasource = define_datasource( + "orders", + { + "schema": { + "id": t.string(), + "_record": t.string(), + }, + "engine": sdk.engine.replacing_merge_tree( + {"sorting_key": ["id"], "ver": "_timestamp", "is_deleted": "_is_deleted"} + ), + "dynamodb": { + "connection": _connection(), + "table_arn": "arn:aws:dynamodb:us-east-1:123456789012:table/orders", + "export_bucket": "s3://my-export-bucket", + }, + }, + ) + + content = generate_datasource(datasource).content + assert 'ENGINE "ReplacingMergeTree"' in content + assert 'ENGINE_VER "_timestamp"' in content + assert 'ENGINE_IS_DELETED "_is_deleted"' in content + assert "IMPORT_CONNECTION_NAME events_dynamodb" in content + assert "IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/orders" in content + assert "IMPORT_EXPORT_BUCKET s3://my-export-bucket" in content + + +def test_datasource_rejects_multiple_ingestion_options() -> None: + s3 = define_s3_connection("landing_s3", {"region": "us-east-1", "arn": "arn:aws:iam::1:role/r"}) + with pytest.raises(ValueError, match="one ingestion option"): + define_datasource( + "mixed", + { + "schema": {"id": t.string()}, + "s3": {"connection": s3, "bucket_uri": "s3://bucket"}, + "dynamodb": { + "connection": _connection(), + "table_arn": "arn:aws:dynamodb:us-east-1:1:table/t", + "export_bucket": "s3://export", + }, + }, + ) + + +def test_parse_dynamodb_connection_file() -> None: + resource = ResourceFile( + kind="connection", + file_path="connections/events_dynamodb.connection", + absolute_path="/x/connections/events_dynamodb.connection", + name="events_dynamodb", + content=( + "TYPE dynamodb\n" + 'DYNAMODB_ARN "arn:aws:iam::123456789012:role/r"\n' + "DYNAMODB_REGION us-east-1\n" + "# a comment\n" + ), + ) + + model = parse_connection_file(resource) + assert model.connection_type == "dynamodb" + assert model.arn == "arn:aws:iam::123456789012:role/r" + assert model.region == "us-east-1" + + +def test_parse_dynamodb_connection_requires_arn_and_region() -> None: + base = ResourceFile( + kind="connection", + file_path="c.connection", + absolute_path="/x/c.connection", + name="c", + content="TYPE dynamodb\nDYNAMODB_REGION us-east-1\n", + ) + with pytest.raises(Exception, match="DYNAMODB_ARN is required"): + parse_connection_file(base) + + +def test_parse_datasource_with_dynamodb_import() -> None: + resource = ResourceFile( + kind="datasource", + file_path="datasources/orders.datasource", + absolute_path="/x/datasources/orders.datasource", + name="orders", + content=( + "SCHEMA >\n" + " `id` String `json:$.Item.id`,\n" + " `_record` String `json:$.NewImage`\n" + "\n" + 'ENGINE "ReplacingMergeTree"\n' + "ENGINE_SORTING_KEY id\n" + "\n" + "IMPORT_CONNECTION_NAME 'events_dynamodb'\n" + "IMPORT_TABLE_ARN 'arn:aws:dynamodb:us-east-1:123456789012:table/orders'\n" + "IMPORT_EXPORT_BUCKET 's3://my-export-bucket'\n" + ), + ) + + model = parse_datasource_file(resource) + assert model.dynamodb is not None + assert model.s3 is None + assert model.dynamodb.connection_name == "events_dynamodb" + assert model.dynamodb.table_arn == "arn:aws:dynamodb:us-east-1:123456789012:table/orders" + assert model.dynamodb.export_bucket == "s3://my-export-bucket" + + +def test_emit_ts_round_trip_for_dynamodb() -> None: + connection_resource = ResourceFile( + kind="connection", + file_path="connections/events_dynamodb.connection", + absolute_path="/x/connections/events_dynamodb.connection", + name="events_dynamodb", + content=( + "TYPE dynamodb\n" + 'DYNAMODB_ARN "arn:aws:iam::123456789012:role/r"\n' + "DYNAMODB_REGION us-east-1\n" + ), + ) + datasource_resource = ResourceFile( + kind="datasource", + file_path="datasources/orders.datasource", + absolute_path="/x/datasources/orders.datasource", + name="orders", + content=( + "SCHEMA >\n" + " `id` String `json:$.Item.id`\n" + "\n" + 'ENGINE "ReplacingMergeTree"\n' + "ENGINE_SORTING_KEY id\n" + "\n" + "IMPORT_CONNECTION_NAME 'events_dynamodb'\n" + "IMPORT_TABLE_ARN 'arn:aws:dynamodb:us-east-1:1:table/orders'\n" + "IMPORT_EXPORT_BUCKET 's3://export'\n" + ), + ) + + connection = parse_connection_file(connection_resource) + datasource = parse_datasource_file(datasource_resource) + + output = emit_migration_file_content([connection, datasource]) + assert "define_dynamodb_connection" in output + assert 'events_dynamodb = define_dynamodb_connection("events_dynamodb"' in output + assert "'dynamodb': {" in output + assert "'table_arn': \"arn:aws:dynamodb:us-east-1:1:table/orders\"" in output + assert "'export_bucket': \"s3://export\"" in output