Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 63 additions & 6 deletions docs/pulsar_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to configure various topic-level settings such as persistence, partitions, retention policies, and schema information. This resource is part of the Pulsar Resources Operator, which enables declarative management of Pulsar resources using Kubernetes custom resources.

## Sepcifications

## Specifications

| Field | Description | Required |
|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| `name` | The fully qualified topic name in the format "persistent://tenant/namespace/topic" or "non-persistent://tenant/namespace/topic". | Yes |
| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this topic. | Yes |
| `persistent` | Whether the topic is persistent or non-persistent. Default is false. Can also be set by topic name prefix. | No |
| `persistent` | Whether the topic is persistent or non-persistent. Default is true. The topic domain in `name` (`persistent://` or `non-persistent://`) takes precedence over this field. | No |
| `partitions` | Number of partitions for the topic. Default is 0. | No |
| `maxProducers` | Maximum number of producers allowed on the topic. | No |
| `maxConsumers` | Maximum number of consumers allowed on the topic. | No |
Expand All @@ -22,7 +20,7 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to
| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. Use "-1" for infinite retention time. | No |
| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. Use "-1" for infinite retention size. | No |
| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No |
| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No |
| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. Use `"-1"` to allow an unlimited backlog (no producer throttling). | No |
| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". **Required whenever backlogQuotaLimitTime or backlogQuotaLimitSize is set.** | Conditional |
| `lifecyclePolicy` | Determines whether to keep or delete the Pulsar topic when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No |
| `schemaInfo` | Schema information for the topic. See [schemaInfo](#schemainfo) for more details. | No |
Expand All @@ -43,7 +41,7 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to
| `subscriptionDispatchRate` | Message dispatch rate limiting policy for subscriptions, controlling the rate at which messages are delivered to consumers per subscription. Uses same format as [dispatchRate](#dispatchRate). | No |
| `replicatorDispatchRate` | Message dispatch rate limiting policy for replicators, controlling the rate at which messages are replicated to other clusters. Uses same format as [dispatchRate](#dispatchRate). | No |
| `deduplicationSnapshotInterval` | Interval for taking deduplication snapshots. This affects the deduplication performance and storage overhead. | No |
| `offloadPolicies` | Offload policies for the topic, controlling how data is offloaded to external storage systems. | No |
| `offloadPolicies` | Offload policies for the topic, controlling how data is offloaded to external (tiered) storage. See [offloadPolicies](#offloadpolicies) for the full field list and examples. | No |
| `autoSubscriptionCreation` | Auto subscription creation override for the topic, controlling whether subscriptions can be created automatically. | No |
| `schemaCompatibilityStrategy` | Schema compatibility strategy for the topic, controlling how schema evolution is handled. Options: UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE. | No |
| `properties` | Map of user-defined properties associated with the topic. These can be used to store additional metadata about the topic. | No |
Expand Down Expand Up @@ -181,7 +179,7 @@ Important notes when updating a Pulsar topic:

1. The fields `name` and `persistent` are immutable and cannot be updated after the topic is created.

2. Other fields such as `partitions`, `maxProducers`, `maxConsumers`, `messageTTL`, `retentionTime`, `retentionSize`, `backlogQuotaLimitTime`, `backlogQuotaLimitSize`, `backlogQuotaRetentionPolicy`, `compactionThreshold`, `persistencePolicies`, `delayedDelivery`, `dispatchRate`, `publishRate`, and `inactiveTopicPolicies` can be modified.
2. Other fields such as `partitions`, `maxProducers`, `maxConsumers`, `messageTTL`, `retentionTime`, `retentionSize`, `backlogQuotaLimitTime`, `backlogQuotaLimitSize`, `backlogQuotaRetentionPolicy`, `backlogQuotaType`, `compactionThreshold`, `persistencePolicies`, `delayedDelivery`, `dispatchRate`, `publishRate`, `inactiveTopicPolicies`, `subscribeRate`, `subscriptionDispatchRate`, `replicatorDispatchRate`, `maxMessageSize`, `maxConsumersPerSubscription`, `maxSubscriptionsPerTopic`, `maxUnAckedMessagesPerConsumer`, `maxUnAckedMessagesPerSubscription`, `deduplication`, `deduplicationSnapshotInterval`, `offloadPolicies`, `autoSubscriptionCreation`, `schemaValidationEnforced`, `schemaCompatibilityStrategy`, and `properties` can be modified. Clearing an optional field (removing it from the spec or setting it to null) removes the corresponding topic-level policy and lets the namespace-level default take effect again.

3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications:

Expand Down Expand Up @@ -368,6 +366,59 @@ spec:
allowAutoSubscriptionCreation: true
```

### offloadPolicies

The `offloadPolicies` field configures topic-level tiered storage offload, overriding any namespace- or broker-level defaults for this topic. The operator applies the policy via the topic admin API; clearing the field removes the topic-level override.

| Field | Description | Type |
|-------|-------------|------|
| `managedLedgerOffloadDriver` | Offload driver name. Supported values include `aws-s3`, `gcs`, `azureblob`, `filesystem`. | string |
| `managedLedgerOffloadMaxThreads` | Maximum number of threads used by the offloader for this topic. | int |
| `managedLedgerOffloadThresholdInBytes` | Size threshold in bytes after which closed ledgers are offloaded to tiered storage. Use `0` to trigger offload as soon as a ledger is closed (i.e. immediately). Use `-1` to disable size-based auto-offload. | int64 |
| `managedLedgerOffloadDeletionLagInMillis` | Time in milliseconds to wait after a ledger is successfully offloaded before deleting it from BookKeeper. For example, `60000` keeps the BookKeeper copy for 1 minute after offload completes. | int64 |
| `managedLedgerOffloadAutoTriggerSizeThresholdBytes` | Legacy alias for the auto-trigger size threshold. Prefer `managedLedgerOffloadThresholdInBytes`. | int64 |
| `s3ManagedLedgerOffloadBucket` | S3 bucket name for the `aws-s3` driver. | string |
| `s3ManagedLedgerOffloadRegion` | S3 region for the `aws-s3` driver. | string |
| `s3ManagedLedgerOffloadServiceEndpoint` | S3 service endpoint URL (optional, e.g. for S3-compatible storage). | string |
| `s3ManagedLedgerOffloadCredentialId` | Access key ID for the `aws-s3` driver. Prefer IAM roles in production. | string |
| `s3ManagedLedgerOffloadCredentialSecret` | Secret access key for the `aws-s3` driver. Prefer IAM roles in production. | string |
| `s3ManagedLedgerOffloadRole` | IAM role ARN to assume when offloading. | string |
| `s3ManagedLedgerOffloadRoleSessionName` | Session name used when assuming the IAM role. | string |
| `offloadersDirectory` | Directory inside the broker container where offloader jars are loaded from. | string |
| `managedLedgerOffloadDriverMetadata` | Free-form driver-specific metadata as key/value pairs. | map[string]string |

> **Note:** Bucket, region, credentials and driver configuration are normally provided at the broker / cluster level (for example via the `sn-platform` chart's `broker.offload` settings). At the topic level you usually only need the policy fields that change behavior — `managedLedgerOffloadThresholdInBytes` and `managedLedgerOffloadDeletionLagInMillis` — and can leave the driver/bucket fields empty so the broker defaults are inherited.

**Example — aggressive immediate offload for a single topic:**

The configuration below triggers offload as soon as each ledger is closed, deletes the BookKeeper copy 60 seconds after offload completes, and removes the producer-side backlog quota so producers are never throttled:

```yaml
apiVersion: resource.streamnative.io/v1alpha1
kind: PulsarTopic
metadata:
name: fast-offload-topic
namespace: test
spec:
name: persistent://test-tenant/testns/fast-offload
connectionRef:
name: test-pulsar-connection

# Offload every closed ledger immediately, and delete from BookKeeper 60s after offload completes.
offloadPolicies:
managedLedgerOffloadThresholdInBytes: 0 # equivalent to offloadThresholdBytes=0
managedLedgerOffloadDeletionLagInMillis: 60000 # equivalent to offloadDeletionLagSeconds=60

# Allow unlimited backlog without throttling producers.
backlogQuotaLimitSize: "-1" # equivalent to backlogQuotaLimitBytes=-1
backlogQuotaRetentionPolicy: producer_request_hold
```

> **Mapping from `pulsar-admin` / broker config to PulsarTopic fields:**
> - `offloadThresholdBytes` (admin: `pulsar-admin topics set-offload-policies --offloadThresholdInBytes`) → `offloadPolicies.managedLedgerOffloadThresholdInBytes`
> - `offloadDeletionLagSeconds` / `offloadDeletionLagInMillis` → `offloadPolicies.managedLedgerOffloadDeletionLagInMillis` (value in **milliseconds**, so `60s` becomes `60000`)
> - `backlogQuotaLimitBytes` → `backlogQuotaLimitSize` (use `"-1"` for unlimited)

## SchemaInfo

The `schemaInfo` field in the PulsarTopic specification allows you to define the schema for the topic. For more details about Pulsar schemas, refer to the [official documentation](https://pulsar.apache.org/docs/schema-understand/).
Expand Down Expand Up @@ -518,6 +569,11 @@ spec:
# Auto subscription creation
autoSubscriptionCreation:
allowAutoSubscriptionCreation: false

# Tiered storage offload (driver / bucket / credentials inherited from broker defaults)
offloadPolicies:
managedLedgerOffloadThresholdInBytes: 1073741824 # 1 GiB
managedLedgerOffloadDeletionLagInMillis: 300000 # 5 minutes

# Custom properties
properties:
Expand Down Expand Up @@ -549,3 +605,4 @@ This example demonstrates a production-ready topic configuration with:
- Consumer and subscription limits per topic
- Custom properties for metadata tracking
- Controlled auto-subscription creation (disabled for production)
- Per-topic tiered storage offload (1 GiB threshold, 5-minute BookKeeper deletion lag)
Loading