Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,13 @@ Use a local Tinybird container for development without affecting cloud workspace
### Connections

```python
from tinybird_sdk import define_gcs_connection, define_kafka_connection, define_s3_connection, secret
from tinybird_sdk import (
define_dynamodb_connection,
define_gcs_connection,
define_kafka_connection,
define_s3_connection,
secret,
)

events_kafka = define_kafka_connection(
"events_kafka",
Expand All @@ -543,6 +549,42 @@ landing_gcs = define_gcs_connection(
"service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
},
)

events_dynamodb = define_dynamodb_connection(
"events_dynamodb",
{
"region": "us-east-1",
"arn": secret("DYNAMODB_ROLE_ARN"),
},
)
```

The DynamoDB connector uses an IAM role to authenticate. Reference the connection from
a datasource to import a DynamoDB table:

```python
from tinybird_sdk import column, define_datasource, engine, t

orders = define_datasource(
"orders",
{
"schema": {
"id": column(t.string(), {"json_path": "$.Item.id"}),
"_record": column(t.string(), {"json_path": "$.NewImage"}),
"_timestamp": column(t.date_time64(3), {"json_path": "$.ApproximateCreationDateTime"}),
"_event_name": column(t.string().low_cardinality(), {"json_path": "$.eventName"}),
"_is_deleted": column(t.uint8(), {"json_path": "$._is_deleted"}),
},
"engine": engine.replacing_merge_tree(
{"sorting_key": ["id"], "ver": "_timestamp", "is_deleted": "_is_deleted"}
),
"dynamodb": {
"connection": events_dynamodb,
"table_arn": "arn:aws:dynamodb:us-east-1:123456789012:table/orders",
"export_bucket": "s3://my-tinybird-dynamodb-exports",
},
},
)
```

### Datasources
Expand Down
5 changes: 5 additions & 0 deletions src/tinybird_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
"define_kafka_connection": ("tinybird_sdk.schema", "define_kafka_connection"),
"define_s3_connection": ("tinybird_sdk.schema", "define_s3_connection"),
"define_gcs_connection": ("tinybird_sdk.schema", "define_gcs_connection"),
"define_dynamodb_connection": ("tinybird_sdk.schema", "define_dynamodb_connection"),
"get_connection_type": ("tinybird_sdk.schema", "get_connection_type"),
"is_connection_definition": ("tinybird_sdk.schema", "is_connection_definition"),
"is_kafka_connection_definition": ("tinybird_sdk.schema", "is_kafka_connection_definition"),
"is_s3_connection_definition": ("tinybird_sdk.schema", "is_s3_connection_definition"),
"is_gcs_connection_definition": ("tinybird_sdk.schema", "is_gcs_connection_definition"),
"is_dynamodb_connection_definition": (
"tinybird_sdk.schema",
"is_dynamodb_connection_definition",
),
"secret": ("tinybird_sdk.schema", "secret"),
"define_token": ("tinybird_sdk.schema", "define_token"),
"is_token_definition": ("tinybird_sdk.schema", "is_token_definition"),
Expand Down
16 changes: 16 additions & 0 deletions src/tinybird_sdk/generator/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ..schema.connection import (
ConnectionDefinition,
DynamoDBConnectionDefinition,
GCSConnectionDefinition,
KafkaConnectionDefinition,
S3ConnectionDefinition,
Expand Down Expand Up @@ -70,6 +71,17 @@ def _generate_gcs_connection(connection: GCSConnectionDefinition) -> str:
return "\n".join(parts)


def _generate_dynamodb_connection(connection: DynamoDBConnectionDefinition) -> str:
options = connection.options
parts = [
"TYPE dynamodb",
f"DYNAMODB_ARN {options.arn}",
f"DYNAMODB_REGION {options.region}",
]

return "\n".join(parts)


def generate_connection(connection: ConnectionDefinition) -> GeneratedConnection:
if isinstance(connection, KafkaConnectionDefinition):
return GeneratedConnection(
Expand All @@ -83,6 +95,10 @@ def generate_connection(connection: ConnectionDefinition) -> GeneratedConnection
return GeneratedConnection(
name=connection._name, content=_generate_gcs_connection(connection)
)
if isinstance(connection, DynamoDBConnectionDefinition):
return GeneratedConnection(
name=connection._name, content=_generate_dynamodb_connection(connection)
)
raise ValueError(f"Unsupported connection type: {connection._connectionType}")


Expand Down
12 changes: 12 additions & 0 deletions src/tinybird_sdk/generator/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ def _generate_import_config(import_config: Any) -> str:
return "\n".join(lines)


def _generate_dynamodb_config(dynamodb: Any) -> str:
lines = [
f"IMPORT_CONNECTION_NAME {dynamodb.connection._name}",
f"IMPORT_TABLE_ARN {dynamodb.table_arn}",
f"IMPORT_EXPORT_BUCKET {dynamodb.export_bucket}",
]
return "\n".join(lines)


def _generate_forward_query(forward_query: str | None) -> str | None:
if not forward_query or not forward_query.strip():
return None
Expand Down Expand Up @@ -193,6 +202,9 @@ def generate_datasource(datasource: DatasourceDefinition) -> GeneratedDatasource
if datasource.options.gcs:
parts.extend(["", _generate_import_config(datasource.options.gcs)])

if datasource.options.dynamodb:
parts.extend(["", _generate_dynamodb_config(datasource.options.dynamodb)])

forward_query = _generate_forward_query(datasource.options.forward_query)
if forward_query:
parts.extend(["", forward_query])
Expand Down
31 changes: 30 additions & 1 deletion src/tinybird_sdk/migrate/emit_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .parser_utils import parse_literal_from_datafile
from .types import (
DatasourceModel,
DynamoDBConnectionModel,
GCSConnectionModel,
KafkaConnectionModel,
ParsedResource,
Expand Down Expand Up @@ -261,6 +262,14 @@ def _emit_datasource(ds: DatasourceModel) -> str:
lines.append(f" 'from_timestamp': {_escape_string(ds.gcs.from_timestamp)},")
lines.append(" },")

if ds.dynamodb:
connection_var = to_snake_case(ds.dynamodb.connection_name)
lines.append(" 'dynamodb': {")
lines.append(f" 'connection': {connection_var},")
lines.append(f" 'table_arn': {_escape_string(ds.dynamodb.table_arn)},")
lines.append(f" 'export_bucket': {_escape_string(ds.dynamodb.export_bucket)},")
lines.append(" },")

if ds.forward_query:
lines.append(" 'forward_query': '''")
lines.append(ds.forward_query)
Expand Down Expand Up @@ -336,13 +345,31 @@ def _emit_gcs_connection(connection: GCSConnectionModel) -> str:
return "\n".join(lines)


def _emit_dynamodb_connection(connection: DynamoDBConnectionModel) -> str:
variable_name = to_snake_case(connection.name)
lines: list[str] = []
lines.append(
f"{variable_name} = define_dynamodb_connection({_escape_string(connection.name)}, {{"
)
lines.append(f" 'region': {_escape_string(connection.region)},")
lines.append(f" 'arn': {_escape_string(connection.arn)},")
lines.append("})")
lines.append("")
return "\n".join(lines)


def _emit_connection(
connection: KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel,
connection: KafkaConnectionModel
| S3ConnectionModel
| GCSConnectionModel
| DynamoDBConnectionModel,
) -> str:
if isinstance(connection, S3ConnectionModel):
return _emit_s3_connection(connection)
if isinstance(connection, GCSConnectionModel):
return _emit_gcs_connection(connection)
if isinstance(connection, DynamoDBConnectionModel):
return _emit_dynamodb_connection(connection)
return _emit_kafka_connection(connection)


Expand Down Expand Up @@ -474,6 +501,8 @@ def emit_migration_file_content(resources: list[ParsedResource]) -> str:
imports.add("define_s3_connection")
elif isinstance(conn, GCSConnectionModel):
imports.add("define_gcs_connection")
elif isinstance(conn, DynamoDBConnectionModel):
imports.add("define_dynamodb_connection")
if needs_column:
imports.add("column")
if needs_params:
Expand Down
77 changes: 73 additions & 4 deletions src/tinybird_sdk/migrate/parse_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from .types import (
ConnectionModel,
DynamoDBConnectionModel,
GCSConnectionModel,
KafkaConnectionModel,
ResourceFile,
Expand All @@ -30,6 +31,8 @@
"S3_ACCESS_KEY",
"S3_SECRET",
"GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON",
"DYNAMODB_ARN",
"DYNAMODB_REGION",
}


Expand Down Expand Up @@ -58,6 +61,9 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
access_secret: str | None = None
service_account_credentials_json: str | None = None

dynamodb_arn: str | None = None
dynamodb_region: str | None = None

i = 0
while i < len(lines):
raw_line = lines[i]
Expand Down Expand Up @@ -119,6 +125,10 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
access_secret = parse_quoted_value(value)
elif name == "GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON":
service_account_credentials_json = parse_quoted_value(value)
elif name == "DYNAMODB_ARN":
dynamodb_arn = parse_quoted_value(value)
elif name == "DYNAMODB_REGION":
dynamodb_region = parse_quoted_value(value)
else:
raise MigrationParseError(
resource.file_path,
Expand All @@ -135,12 +145,20 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
)

if connection_type == "kafka":
if region or arn or access_key or access_secret or service_account_credentials_json:
if (
region
or arn
or access_key
or access_secret
or service_account_credentials_json
or dynamodb_arn
or dynamodb_region
):
raise MigrationParseError(
resource.file_path,
"connection",
resource.name,
"S3/GCS directives are not valid for kafka connections.",
"S3/GCS/DynamoDB directives are not valid for kafka connections.",
)

if not bootstrap_servers:
Expand Down Expand Up @@ -175,12 +193,14 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
or schema_registry_url
or ssl_ca_pem
or service_account_credentials_json
or dynamodb_arn
or dynamodb_region
):
raise MigrationParseError(
resource.file_path,
"connection",
resource.name,
"Kafka/GCS directives are not valid for s3 connections.",
"Kafka/GCS/DynamoDB directives are not valid for s3 connections.",
)

if not region:
Expand Down Expand Up @@ -231,12 +251,14 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
or arn
or access_key
or access_secret
or dynamodb_arn
or dynamodb_region
):
raise MigrationParseError(
resource.file_path,
"connection",
resource.name,
"Kafka/S3 directives are not valid for gcs connections.",
"Kafka/S3/DynamoDB directives are not valid for gcs connections.",
)

if not service_account_credentials_json:
Expand All @@ -255,6 +277,53 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
service_account_credentials_json=service_account_credentials_json,
)

if connection_type == "dynamodb":
if (
bootstrap_servers
or security_protocol
or sasl_mechanism
or key
or secret
or schema_registry_url
or ssl_ca_pem
or region
or arn
or access_key
or access_secret
or service_account_credentials_json
):
raise MigrationParseError(
resource.file_path,
"connection",
resource.name,
"Kafka/S3/GCS directives are not valid for dynamodb connections.",
)

if not dynamodb_arn:
raise MigrationParseError(
resource.file_path,
"connection",
resource.name,
"DYNAMODB_ARN is required for dynamodb connections.",
)

if not dynamodb_region:
raise MigrationParseError(
resource.file_path,
"connection",
resource.name,
"DYNAMODB_REGION is required for dynamodb connections.",
)

return DynamoDBConnectionModel(
kind="connection",
name=resource.name,
file_path=resource.file_path,
connection_type="dynamodb",
region=dynamodb_region,
arn=dynamodb_arn,
)

raise MigrationParseError(
resource.file_path,
"connection",
Expand Down
Loading
Loading