This guide covers best practices for building production-quality providers. Follow these guidelines to create providers that are reliable, secure, and easy to maintain.
Idempotency
Idempotency is the most critical requirement for providers. Lifecycle methods may be called multiple times with the same input due to retries, restarts, or redelivery.
See Lifecycle Methods - Idempotency Requirements for patterns on implementing idempotent create, update, and delete operations.
Key principles:
- Create: Handle “already exists” errors by retrieving the existing resource
- Update: Return existing outputs when nothing changed
- Delete: Treat “not found” as success, not failure
Error Handling
Use Specific Exception Types
Choose exception types that communicate the nature of the failure:
async def on_create(self) -> DatabaseOutputs:
try:
db = await self.client.create_database(self.config.name)
except QuotaExceededError as e:
# RuntimeError: infrastructure/API failures (retryable)
raise RuntimeError(f"Quota exceeded: {e}")
except InvalidNameError as e:
# ValueError: user configuration errors (not retryable)
raise ValueError(f"Invalid database name: {e}")
return DatabaseOutputs(connection_url=db.url)
| Exception Type | When to Use | Retryable |
|---|
ValueError | Invalid configuration (user error) | No |
RuntimeError | Infrastructure or API failures | Depends |
| Unhandled exceptions | Unexpected errors (preserve traceback) | Yes |
Let Unexpected Errors Propagate
Do not catch-all exceptions. Let unexpected errors propagate with full tracebacks for debugging:
# Good: specific handling, unexpected errors propagate
async def on_create(self) -> SecretOutputs:
try:
secret = self.client.create_secret(self.config.secret_id)
except AlreadyExists:
secret = self.client.get_secret(self.config.secret_id)
# Other exceptions propagate with full traceback
return SecretOutputs(resource_name=secret.name)
# Bad: swallowing all exceptions
async def on_create(self) -> SecretOutputs:
try:
secret = self.client.create_secret(self.config.secret_id)
except Exception as e: # Don't do this
raise RuntimeError(f"Failed: {e}")
Provide Actionable Error Messages
Error messages should help users understand what went wrong and how to fix it:
# Good: tells user what to do
raise ValueError("Cannot change project_id; delete and recreate resource")
# Bad: generic message
raise ValueError("Invalid configuration")
Credentials Management
Accept Credentials as Configuration
Providers should accept credentials as configuration, not rely on ambient credentials:
class DatabaseConfig(Config):
name: str
# Accept credentials as config, not from environment
credentials: dict[str, Any] | str
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
def _get_client(self) -> CloudClient:
creds = self.config.credentials
# Handle both dict and JSON string formats
if isinstance(creds, str):
creds = json.loads(creds)
return CloudClient(credentials=creds)
This approach:
- Works in multi-tenant environments where each user has different credentials
- Makes credential requirements explicit in the schema
- Enables users to manage credentials via Pragma secrets
Use FieldReference for Secrets
Document that users should use FieldReference to pass credentials from a secret resource:
# User's resource configuration
provider: mycloud
resource: database
name: analytics
config:
name: analytics
credentials:
$ref:
provider: pragma
resource: secret
name: mycloud-creds
field: data
Never Log Credentials
Ensure credentials never appear in logs or error messages:
def _get_client(self) -> CloudClient:
try:
return CloudClient(credentials=self.config.credentials)
except AuthenticationError as e:
# Good: don't include credentials in error
raise RuntimeError("Authentication failed - check credentials")
# Bad: might expose credentials
# raise RuntimeError(f"Auth failed: {e}")
Cache Clients Within Lifecycle Methods
Create API clients once per lifecycle invocation, not per operation:
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
def _get_client(self) -> CloudClient:
"""Create client once per lifecycle call."""
return CloudClient(credentials=self.config.credentials)
async def on_create(self) -> DatabaseOutputs:
client = self._get_client() # Create once
db = client.create_database(self.config.name)
client.configure_database(db.id, self.config.settings)
client.enable_monitoring(db.id)
return DatabaseOutputs(database_id=db.id)
Minimize API Calls in Updates
Only call external APIs when configuration actually changed:
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
# Check if anything relevant changed
if previous_config == self.config:
return self.outputs # No API calls needed
client = self._get_client()
# Only update what changed
if previous_config.size_gb != self.config.size_gb:
client.resize(self.outputs.database_id, self.config.size_gb)
if previous_config.settings != self.config.settings:
client.configure(self.outputs.database_id, self.config.settings)
return self.outputs
Use Async Where Possible
Lifecycle methods are async. Use async API clients when available:
async def on_create(self) -> DatabaseOutputs:
async with AsyncCloudClient(credentials=self.config.credentials) as client:
# Async operations can be awaited
db = await client.create_database(self.config.name)
# Parallel operations when independent
await asyncio.gather(
client.configure_settings(db.id, self.config.settings),
client.setup_monitoring(db.id),
)
return DatabaseOutputs(database_id=db.id)
Common Mistakes
Forgetting Idempotency
The most common mistake is assuming lifecycle methods only run once:
# Wrong: fails on retry if secret exists
async def on_create(self) -> SecretOutputs:
secret = self.client.create_secret(self.config.secret_id) # Throws on retry!
return SecretOutputs(resource_name=secret.name)
# Right: handle already exists
async def on_create(self) -> SecretOutputs:
try:
secret = self.client.create_secret(self.config.secret_id)
except AlreadyExists:
secret = self.client.get_secret(self.config.secret_id)
return SecretOutputs(resource_name=secret.name)
Mutable State Between Calls
Do not store state on the resource instance between lifecycle calls:
# Wrong: state from previous calls won't exist
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
def __init__(self, **data):
super().__init__(**data)
self._client = None # Won't persist between calls!
async def on_create(self) -> DatabaseOutputs:
self._client = CloudClient() # Stored here...
...
async def on_update(self, prev: DatabaseConfig) -> DatabaseOutputs:
self._client.update(...) # ...but won't exist here!
# Right: create fresh each time
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
def _get_client(self) -> CloudClient:
return CloudClient(credentials=self.config.credentials)
async def on_update(self, prev: DatabaseConfig) -> DatabaseOutputs:
client = self._get_client() # Fresh client
...
Missing Immutable Field Validation
Certain fields cannot be changed in-place. Validate these in on_update:
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
# Validate immutable fields first
if previous_config.region != self.config.region:
raise ValueError("Cannot change region; delete and recreate resource")
if previous_config.engine != self.config.engine:
raise ValueError("Cannot change database engine; delete and recreate resource")
# Then handle mutable fields...
Not Using self.outputs
In on_update and on_delete, the previous outputs are available via self.outputs. Use them:
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
# Use self.outputs to get the resource ID
client = self._get_client()
client.resize(self.outputs.database_id, self.config.size_gb)
# Return existing outputs if they haven't changed
return self.outputs
async def on_delete(self) -> None:
client = self._get_client()
# Use self.outputs to know what to delete
client.delete_database(self.outputs.database_id)
Incomplete Cleanup in on_delete
Delete all resources created by on_create, not just the primary resource:
async def on_delete(self) -> None:
client = self._get_client()
try:
# Delete associated resources too
client.delete_monitoring(self.outputs.database_id)
client.delete_backups(self.outputs.database_id)
client.delete_database(self.outputs.database_id)
except NotFound:
# Already deleted - idempotent success
pass
Testing
Use ProviderHarness
The SDK provides ProviderHarness for testing lifecycle methods without real infrastructure:
from pragma_sdk.provider import ProviderHarness
async def test_create_success(harness: ProviderHarness):
result = await harness.invoke_create(
Database,
name="test-db",
config=DatabaseConfig(name="analytics"),
)
assert result.success
assert result.outputs.database_id is not None
Test Idempotency Explicitly
Write tests that verify idempotent behavior:
async def test_create_idempotent(harness, mock_client):
"""on_create handles already exists gracefully."""
mock_client.create_database.side_effect = AlreadyExists("exists")
config = DatabaseConfig(name="existing-db")
result = await harness.invoke_create(Database, name="db", config=config)
assert result.success # Should succeed, not fail
mock_client.get_database.assert_called_once() # Should retrieve existing
async def test_delete_idempotent(harness, mock_client):
"""on_delete succeeds when resource doesn't exist."""
mock_client.delete_database.side_effect = NotFound("gone")
result = await harness.invoke_delete(Database, name="db", config=config)
assert result.success # Not found is success for delete
Test Immutable Field Validation
Verify that changing immutable fields fails appropriately:
async def test_update_rejects_region_change(harness):
"""on_update rejects region changes."""
previous = DatabaseConfig(name="db", region="us-east-1")
current = DatabaseConfig(name="db", region="eu-west-1")
result = await harness.invoke_update(
Database,
name="db",
config=current,
previous_config=previous,
)
assert result.failed
assert "region" in str(result.error)
Mock External Services
Use fixtures to mock external API clients:
@pytest.fixture
def mock_client(monkeypatch):
"""Mock cloud client for testing."""
mock = MagicMock()
# Configure default responses
mock.create_database.return_value = MagicMock(id="db-123", url="postgres://...")
mock.get_database.return_value = MagicMock(id="db-123", url="postgres://...")
# Patch the client constructor
monkeypatch.setattr(
"mycloud_provider.resources.database.CloudClient",
lambda credentials=None: mock,
)
return mock
What’s Next