Skip to main content
This guide covers best practices for building production-quality providers. Follow these guidelines to create providers that are reliable, secure, and easy to maintain.

Idempotency

Idempotency is the most critical requirement for providers. Lifecycle methods may be called multiple times with the same input due to retries, restarts, or redelivery.
See Lifecycle Methods - Idempotency Requirements for patterns on implementing idempotent create, update, and delete operations. Key principles:
  • Create: Handle “already exists” errors by retrieving the existing resource
  • Update: Return existing outputs when nothing changed
  • Delete: Treat “not found” as success, not failure

Error Handling

Use Specific Exception Types

Choose exception types that communicate the nature of the failure:
async def on_create(self) -> DatabaseOutputs:
    try:
        db = await self.client.create_database(self.config.name)
    except QuotaExceededError as e:
        # RuntimeError: infrastructure/API failures (retryable)
        raise RuntimeError(f"Quota exceeded: {e}")
    except InvalidNameError as e:
        # ValueError: user configuration errors (not retryable)
        raise ValueError(f"Invalid database name: {e}")

    return DatabaseOutputs(connection_url=db.url)
Exception TypeWhen to UseRetryable
ValueErrorInvalid configuration (user error)No
RuntimeErrorInfrastructure or API failuresDepends
Unhandled exceptionsUnexpected errors (preserve traceback)Yes

Let Unexpected Errors Propagate

Do not catch-all exceptions. Let unexpected errors propagate with full tracebacks for debugging:
# Good: specific handling, unexpected errors propagate
async def on_create(self) -> SecretOutputs:
    try:
        secret = self.client.create_secret(self.config.secret_id)
    except AlreadyExists:
        secret = self.client.get_secret(self.config.secret_id)
    # Other exceptions propagate with full traceback

    return SecretOutputs(resource_name=secret.name)

# Bad: swallowing all exceptions
async def on_create(self) -> SecretOutputs:
    try:
        secret = self.client.create_secret(self.config.secret_id)
    except Exception as e:  # Don't do this
        raise RuntimeError(f"Failed: {e}")

Provide Actionable Error Messages

Error messages should help users understand what went wrong and how to fix it:
# Good: tells user what to do
raise ValueError("Cannot change project_id; delete and recreate resource")

# Bad: generic message
raise ValueError("Invalid configuration")

Credentials Management

Accept Credentials as Configuration

Providers should accept credentials as configuration, not rely on ambient credentials:
class DatabaseConfig(Config):
    name: str
    # Accept credentials as config, not from environment
    credentials: dict[str, Any] | str

class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    def _get_client(self) -> CloudClient:
        creds = self.config.credentials

        # Handle both dict and JSON string formats
        if isinstance(creds, str):
            creds = json.loads(creds)

        return CloudClient(credentials=creds)
This approach:
  • Works in multi-tenant environments where each user has different credentials
  • Makes credential requirements explicit in the schema
  • Enables users to manage credentials via Pragma secrets

Use FieldReference for Secrets

Document that users should use FieldReference to pass credentials from a secret resource:
# User's resource configuration
provider: mycloud
resource: database
name: analytics
config:
  name: analytics
  credentials:
    $ref:
      provider: pragma
      resource: secret
      name: mycloud-creds
      field: data

Never Log Credentials

Ensure credentials never appear in logs or error messages:
def _get_client(self) -> CloudClient:
    try:
        return CloudClient(credentials=self.config.credentials)
    except AuthenticationError as e:
        # Good: don't include credentials in error
        raise RuntimeError("Authentication failed - check credentials")

        # Bad: might expose credentials
        # raise RuntimeError(f"Auth failed: {e}")

Performance

Cache Clients Within Lifecycle Methods

Create API clients once per lifecycle invocation, not per operation:
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    def _get_client(self) -> CloudClient:
        """Create client once per lifecycle call."""
        return CloudClient(credentials=self.config.credentials)

    async def on_create(self) -> DatabaseOutputs:
        client = self._get_client()  # Create once

        db = client.create_database(self.config.name)
        client.configure_database(db.id, self.config.settings)
        client.enable_monitoring(db.id)

        return DatabaseOutputs(database_id=db.id)

Minimize API Calls in Updates

Only call external APIs when configuration actually changed:
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
    # Check if anything relevant changed
    if previous_config == self.config:
        return self.outputs  # No API calls needed

    client = self._get_client()

    # Only update what changed
    if previous_config.size_gb != self.config.size_gb:
        client.resize(self.outputs.database_id, self.config.size_gb)

    if previous_config.settings != self.config.settings:
        client.configure(self.outputs.database_id, self.config.settings)

    return self.outputs

Use Async Where Possible

Lifecycle methods are async. Use async API clients when available:
async def on_create(self) -> DatabaseOutputs:
    async with AsyncCloudClient(credentials=self.config.credentials) as client:
        # Async operations can be awaited
        db = await client.create_database(self.config.name)

        # Parallel operations when independent
        await asyncio.gather(
            client.configure_settings(db.id, self.config.settings),
            client.setup_monitoring(db.id),
        )

    return DatabaseOutputs(database_id=db.id)

Common Mistakes

Forgetting Idempotency

The most common mistake is assuming lifecycle methods only run once:
# Wrong: fails on retry if secret exists
async def on_create(self) -> SecretOutputs:
    secret = self.client.create_secret(self.config.secret_id)  # Throws on retry!
    return SecretOutputs(resource_name=secret.name)

# Right: handle already exists
async def on_create(self) -> SecretOutputs:
    try:
        secret = self.client.create_secret(self.config.secret_id)
    except AlreadyExists:
        secret = self.client.get_secret(self.config.secret_id)
    return SecretOutputs(resource_name=secret.name)

Mutable State Between Calls

Do not store state on the resource instance between lifecycle calls:
# Wrong: state from previous calls won't exist
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    def __init__(self, **data):
        super().__init__(**data)
        self._client = None  # Won't persist between calls!

    async def on_create(self) -> DatabaseOutputs:
        self._client = CloudClient()  # Stored here...
        ...

    async def on_update(self, prev: DatabaseConfig) -> DatabaseOutputs:
        self._client.update(...)  # ...but won't exist here!

# Right: create fresh each time
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    def _get_client(self) -> CloudClient:
        return CloudClient(credentials=self.config.credentials)

    async def on_update(self, prev: DatabaseConfig) -> DatabaseOutputs:
        client = self._get_client()  # Fresh client
        ...

Missing Immutable Field Validation

Certain fields cannot be changed in-place. Validate these in on_update:
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
    # Validate immutable fields first
    if previous_config.region != self.config.region:
        raise ValueError("Cannot change region; delete and recreate resource")

    if previous_config.engine != self.config.engine:
        raise ValueError("Cannot change database engine; delete and recreate resource")

    # Then handle mutable fields...

Not Using self.outputs

In on_update and on_delete, the previous outputs are available via self.outputs. Use them:
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
    # Use self.outputs to get the resource ID
    client = self._get_client()
    client.resize(self.outputs.database_id, self.config.size_gb)

    # Return existing outputs if they haven't changed
    return self.outputs

async def on_delete(self) -> None:
    client = self._get_client()
    # Use self.outputs to know what to delete
    client.delete_database(self.outputs.database_id)

Incomplete Cleanup in on_delete

Delete all resources created by on_create, not just the primary resource:
async def on_delete(self) -> None:
    client = self._get_client()

    try:
        # Delete associated resources too
        client.delete_monitoring(self.outputs.database_id)
        client.delete_backups(self.outputs.database_id)
        client.delete_database(self.outputs.database_id)
    except NotFound:
        # Already deleted - idempotent success
        pass

Testing

Use ProviderHarness

The SDK provides ProviderHarness for testing lifecycle methods without real infrastructure:
from pragma_sdk.provider import ProviderHarness

async def test_create_success(harness: ProviderHarness):
    result = await harness.invoke_create(
        Database,
        name="test-db",
        config=DatabaseConfig(name="analytics"),
    )

    assert result.success
    assert result.outputs.database_id is not None

Test Idempotency Explicitly

Write tests that verify idempotent behavior:
async def test_create_idempotent(harness, mock_client):
    """on_create handles already exists gracefully."""
    mock_client.create_database.side_effect = AlreadyExists("exists")

    config = DatabaseConfig(name="existing-db")
    result = await harness.invoke_create(Database, name="db", config=config)

    assert result.success  # Should succeed, not fail
    mock_client.get_database.assert_called_once()  # Should retrieve existing


async def test_delete_idempotent(harness, mock_client):
    """on_delete succeeds when resource doesn't exist."""
    mock_client.delete_database.side_effect = NotFound("gone")

    result = await harness.invoke_delete(Database, name="db", config=config)

    assert result.success  # Not found is success for delete

Test Immutable Field Validation

Verify that changing immutable fields fails appropriately:
async def test_update_rejects_region_change(harness):
    """on_update rejects region changes."""
    previous = DatabaseConfig(name="db", region="us-east-1")
    current = DatabaseConfig(name="db", region="eu-west-1")

    result = await harness.invoke_update(
        Database,
        name="db",
        config=current,
        previous_config=previous,
    )

    assert result.failed
    assert "region" in str(result.error)

Mock External Services

Use fixtures to mock external API clients:
@pytest.fixture
def mock_client(monkeypatch):
    """Mock cloud client for testing."""
    mock = MagicMock()

    # Configure default responses
    mock.create_database.return_value = MagicMock(id="db-123", url="postgres://...")
    mock.get_database.return_value = MagicMock(id="db-123", url="postgres://...")

    # Patch the client constructor
    monkeypatch.setattr(
        "mycloud_provider.resources.database.CloudClient",
        lambda credentials=None: mock,
    )

    return mock

What’s Next