Skip to main content
This page shows complete, annotated provider examples. Each example explains the design decisions inline so you can understand not just what the code does, but why.

Cloud Resource Provider Pattern

This example shows the pattern used by real cloud providers: user-provided credentials, idempotent operations, and proper error handling. Based on the GCP Secret Manager provider.

The Complete Resource

"""GCP Secret Manager resource - complete annotated example."""

from __future__ import annotations

import json
from typing import Any, ClassVar

from google.api_core.exceptions import AlreadyExists, NotFound
from google.cloud import secretmanager
from google.oauth2 import service_account
from pragma_sdk import Config, Outputs, Resource


# -----------------------------------------------------------------------------
# Config: What users provide
# -----------------------------------------------------------------------------
class SecretConfig(Config):
    """Configuration for a GCP Secret Manager secret.

    Design decisions:
    - project_id and secret_id are required because they define the secret's
      identity in GCP. These cannot change after creation.
    - data is the secret payload. Simple string type handles most use cases.
    - credentials is required (no ADC fallback) because this is a multi-tenant
      SaaS platform. Each user operates in their own GCP project with their
      own service account.
    """

    # Identity fields (immutable after creation)
    project_id: str
    secret_id: str

    # Payload
    data: str

    # Authentication - accepts dict or JSON string for flexibility
    # Users typically pass this via a FieldReference to a pragma/secret
    credentials: dict[str, Any] | str


# -----------------------------------------------------------------------------
# Outputs: What dependents can reference
# -----------------------------------------------------------------------------
class SecretOutputs(Outputs):
    """Outputs from GCP Secret Manager secret creation.

    Design decisions:
    - resource_name is the full GCP path. Dependents need this to access
      the secret via GCP APIs.
    - version_name includes the version so dependents can pin to specific
      versions if needed.
    - version_id is just the number for display/logging purposes.
    """

    resource_name: str   # projects/{project}/secrets/{id}
    version_name: str    # projects/{project}/secrets/{id}/versions/{version}
    version_id: str      # Just the version number ("1", "2", etc.)


# -----------------------------------------------------------------------------
# Resource: The lifecycle implementation
# -----------------------------------------------------------------------------
class Secret(Resource[SecretConfig, SecretOutputs]):
    """GCP Secret Manager secret resource.

    Pattern: Cloud resource with user-provided credentials

    This pattern is common when:
    - Users manage resources in their own cloud accounts
    - The platform doesn't have ambient credentials (no Workload Identity)
    - Multi-tenant isolation requires per-user authentication
    """

    # Class-level metadata for provider registration
    provider: ClassVar[str] = "gcp"
    resource: ClassVar[str] = "secret"

    # -------------------------------------------------------------------------
    # Helper Methods
    # -------------------------------------------------------------------------
    def _get_client(self) -> secretmanager.SecretManagerServiceClient:
        """Get authenticated client using user-provided credentials.

        Why a method instead of an instance variable?
        - Clients are created fresh for each lifecycle call
        - Avoids serialization issues with stored credentials
        - Handles both dict and JSON string credentials formats
        """
        creds_data = self.config.credentials

        # Support JSON-encoded credentials (common when passed via env vars)
        if isinstance(creds_data, str):
            creds_data = json.loads(creds_data)

        credentials = service_account.Credentials.from_service_account_info(
            creds_data
        )
        return secretmanager.SecretManagerServiceClient(credentials=credentials)

    def _secret_path(self) -> str:
        """Build the full GCP resource path.

        Extracted to a method because it's used in multiple lifecycle handlers.
        """
        return f"projects/{self.config.project_id}/secrets/{self.config.secret_id}"

    # -------------------------------------------------------------------------
    # Lifecycle: on_create
    # -------------------------------------------------------------------------
    async def on_create(self) -> SecretOutputs:
        """Create GCP secret with initial version.

        Idempotency strategy: "Create-or-get"
        - Try to create the secret
        - If it already exists (AlreadyExists exception), get the existing one
        - Always add a new version with the payload

        This handles retry scenarios where:
        1. First call: Creates secret + version 1
        2. Retry (if first call crashed before ack): Gets existing + version 2

        The version increment on retry is acceptable because secret versions
        are append-only in GCP anyway.
        """
        client = self._get_client()
        parent = f"projects/{self.config.project_id}"

        # Step 1: Create or get the secret
        try:
            secret = client.create_secret(
                request={
                    "parent": parent,
                    "secret_id": self.config.secret_id,
                    "secret": {"replication": {"automatic": {}}},
                }
            )
        except AlreadyExists:
            # Secret exists from a previous attempt - this is fine
            secret = client.get_secret(name=self._secret_path())

        # Step 2: Add version with payload (always runs)
        version = client.add_secret_version(
            request={
                "parent": secret.name,
                "payload": {"data": self.config.data.encode("utf-8")},
            }
        )

        return SecretOutputs(
            resource_name=secret.name,
            version_name=version.name,
            version_id=version.name.split("/")[-1],
        )

    # -------------------------------------------------------------------------
    # Lifecycle: on_update
    # -------------------------------------------------------------------------
    async def on_update(self, previous_config: SecretConfig) -> SecretOutputs:
        """Update secret by creating new version if data changed.

        Update strategies used here:

        1. Immutable field check: project_id and secret_id cannot change
           because they define the resource's identity in GCP. Changing them
           would require delete + recreate.

        2. No-op optimization: If data hasn't changed, return existing outputs
           without making any API calls. This is important because on_update
           may be called even when only unrelated fields change.

        3. Append-only update: Secrets use versioning, so "update" means
           adding a new version rather than modifying in place.
        """
        # Validate immutable fields
        if previous_config.project_id != self.config.project_id:
            raise ValueError(
                "Cannot change project_id; delete and recreate resource"
            )

        if previous_config.secret_id != self.config.secret_id:
            raise ValueError(
                "Cannot change secret_id; delete and recreate resource"
            )

        # No-op if data unchanged (important for efficiency)
        if previous_config.data == self.config.data and self.outputs is not None:
            return self.outputs

        # Add new version
        client = self._get_client()
        version = client.add_secret_version(
            request={
                "parent": self._secret_path(),
                "payload": {"data": self.config.data.encode("utf-8")},
            }
        )

        return SecretOutputs(
            resource_name=self._secret_path(),
            version_name=version.name,
            version_id=version.name.split("/")[-1],
        )

    # -------------------------------------------------------------------------
    # Lifecycle: on_delete
    # -------------------------------------------------------------------------
    async def on_delete(self) -> None:
        """Delete secret and all versions.

        Idempotency strategy: "Delete-if-exists"
        - Try to delete
        - If not found (NotFound exception), succeed anyway

        This handles:
        - Normal deletion: Deletes the secret
        - Retry after successful delete: NotFound, returns success
        - Resource never existed: NotFound, returns success
        """
        client = self._get_client()

        try:
            client.delete_secret(name=self._secret_path())
        except NotFound:
            # Already deleted - idempotent success
            pass

Key Patterns to Note

This provider requires explicit credentials rather than using Application Default Credentials (ADC) or Workload Identity. This is the right choice when:
  • Your platform is multi-tenant
  • Users manage resources in their own cloud accounts
  • You cannot assume ambient credentials exist
For single-tenant deployments where the provider runs with cluster credentials, you could simplify to:
def _get_client(self) -> secretmanager.SecretManagerServiceClient:
    # Uses ADC/Workload Identity - no credentials in config
    return secretmanager.SecretManagerServiceClient()
The config distinguishes between:
  • Immutable: project_id, secret_id - Changing these would create a different resource in GCP
  • Mutable: data - Can be updated in place (via new version)
The on_update method validates immutable fields and raises clear errors explaining the workaround (delete and recreate).
Each lifecycle method uses a different idempotency pattern:
MethodStrategyHandles
on_createCreate-or-getAlreadyExists from retry
on_updateNo-op + appendUnchanged configs, version history
on_deleteDelete-if-existsNotFound from retry

Webhook Integration Pattern

This example shows a different pattern: integrating with an external webhook-based API. Common for SaaS integrations, CI/CD systems, and notification services.
"""Slack webhook resource - external API integration pattern."""

from __future__ import annotations

from typing import ClassVar

import httpx
from pragma_sdk import Config, Field, Outputs, Resource


class SlackChannelConfig(Config):
    """Configuration for a Slack notification channel.

    Design decisions:
    - webhook_url uses Field[str] because users often store webhook URLs
      in pragma/secret resources and reference them.
    - channel and username have defaults because Slack webhooks have
      their own defaults configured in Slack.
    - icon_emoji is optional for customization.
    """

    webhook_url: Field[str]              # Often a FieldReference to a secret
    channel: str | None = None           # Override webhook default
    username: str = "Pragma"             # Bot display name
    icon_emoji: str | None = ":robot:"   # Bot icon


class SlackChannelOutputs(Outputs):
    """Outputs from Slack channel setup.

    Design decisions:
    - channel_id is a computed identifier for this channel config
    - verified indicates whether a test message was sent successfully
    - No sensitive data in outputs (webhook URL stays in config)
    """

    channel_id: str
    verified: bool


class SlackChannel(Resource[SlackChannelConfig, SlackChannelOutputs]):
    """Slack notification channel resource.

    Pattern: External API without server-side state

    This pattern applies when:
    - The external service doesn't create persistent resources
    - "Creation" means validating the configuration works
    - "Deletion" is a no-op (nothing to clean up)

    The resource tracks configuration and verification status rather than
    managing remote state.
    """

    provider: ClassVar[str] = "notifications"
    resource: ClassVar[str] = "slack_channel"

    def _build_channel_id(self) -> str:
        """Generate deterministic channel ID from config.

        Using name + channel ensures uniqueness within a tenant's resources.
        """
        channel = self.config.channel or "default"
        return f"slack-{self.name}-{channel}"

    async def _send_test_message(self) -> bool:
        """Send test message to verify webhook works.

        Returns True if successful, False otherwise.
        Doesn't raise on failure - verification is informational.
        """
        payload = {
            "text": f"Channel configured: {self.name}",
            "username": self.config.username,
        }

        if self.config.channel:
            payload["channel"] = self.config.channel
        if self.config.icon_emoji:
            payload["icon_emoji"] = self.config.icon_emoji

        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(
                    self.config.webhook_url,  # Resolved from FieldReference
                    json=payload,
                    timeout=10.0,
                )
                return response.status_code == 200
            except httpx.HTTPError:
                return False

    async def on_create(self) -> SlackChannelOutputs:
        """Validate webhook by sending a test message.

        Idempotency: Multiple creates send multiple test messages.
        This is acceptable because test messages are informational
        and don't create server-side state.
        """
        verified = await self._send_test_message()

        return SlackChannelOutputs(
            channel_id=self._build_channel_id(),
            verified=verified,
        )

    async def on_update(self, previous_config: SlackChannelConfig) -> SlackChannelOutputs:
        """Re-verify if webhook URL changed.

        If only display settings changed (username, emoji), skip verification
        to avoid unnecessary API calls.
        """
        # Re-verify if webhook URL changed
        url_changed = previous_config.webhook_url != self.config.webhook_url
        channel_changed = previous_config.channel != self.config.channel

        if url_changed or channel_changed:
            verified = await self._send_test_message()
        else:
            # Preserve previous verification status
            verified = self.outputs.verified if self.outputs else False

        return SlackChannelOutputs(
            channel_id=self._build_channel_id(),
            verified=verified,
        )

    async def on_delete(self) -> None:
        """No-op - webhooks don't have server-side resources to clean up.

        Could optionally send a "channel removed" notification here.
        """
        pass

When to Use This Pattern

The webhook pattern works well for:
  • Notification channels: Slack, Discord, PagerDuty webhooks
  • CI/CD triggers: Jenkins, GitHub Actions, GitLab pipelines
  • External SaaS: Services configured via API but not “created”
Key characteristics:
  • External service doesn’t persist your configuration
  • “Create” validates the integration works
  • “Delete” has nothing to clean up

Comparison of Patterns

AspectCloud ResourceWebhook Integration
Server-side stateYes (secret in GCP)No (config only)
Create idempotencyCreate-or-getAlways succeeds
Delete idempotencyDelete-if-existsNo-op
CredentialsUser-provided SAWebhook URL (can be ref)
OutputsResource identifiersVerification status

Testing Both Patterns

Both patterns test the same way using ProviderHarness:
from pragma_sdk.provider import ProviderHarness

async def test_cloud_resource_idempotent_create(
    harness: ProviderHarness,
    mock_client,  # Mock the cloud client
) -> None:
    """Verify create handles AlreadyExists."""
    mock_client.create_secret.side_effect = AlreadyExists("exists")

    result = await harness.invoke_create(Secret, name="test", config=config)

    assert result.success
    mock_client.get_secret.assert_called_once()  # Fell back to get


async def test_webhook_verification(
    harness: ProviderHarness,
    httpx_mock,  # Mock HTTP client
) -> None:
    """Verify webhook sends test message."""
    httpx_mock.add_response(status_code=200)

    result = await harness.invoke_create(
        SlackChannel,
        name="alerts",
        config=SlackChannelConfig(webhook_url="https://hooks.slack.com/...")
    )

    assert result.success
    assert result.outputs.verified is True

What’s Next