Every resource must implement three lifecycle methods that Pragmatiks calls when managing your resources. These methods define how your resource is created, updated, and deleted.
Method Signatures
from pragma_sdk import Resource, Config, Outputs
class MyResource(Resource[MyConfig, MyOutputs]):
async def on_create(self) -> MyOutputs:
"""Called when creating a new resource."""
...
async def on_update(self, previous_config: MyConfig) -> MyOutputs:
"""Called when updating an existing resource."""
...
async def on_delete(self) -> None:
"""Called when deleting a resource."""
...
All lifecycle methods are async and run within the provider runtime.
Instance Attributes
Inside lifecycle methods, you have access to several instance attributes:
| Attribute | Type | Description |
|---|
self.name | str | Resource instance name |
self.config | ConfigT | Current configuration |
self.outputs | OutputsT | None | Existing outputs (available in on_update and on_delete) |
self.tags | list[str] | None | Tags attached to the resource |
on_create
Called when a resource is first created. Your implementation should provision the underlying infrastructure and return the outputs.
async def on_create(self) -> DatabaseOutputs:
# Provision the actual resource
db = await self.client.create_database(
name=self.config.name,
size=self.config.size_gb,
)
# Return outputs for dependent resources
return DatabaseOutputs(
connection_url=db.connection_string,
database_id=db.id,
)
When called: The first time a resource is applied.
Return value: An instance of your Outputs class containing values that other resources can reference.
on_update
Called when a resource configuration changes. Receives the previous configuration so you can determine what changed and respond appropriately.
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
# Detect immutable field changes
if previous_config.name != self.config.name:
raise ValueError("Cannot rename database; delete and recreate")
# Handle mutable field changes
if previous_config.size_gb != self.config.size_gb:
await self.client.resize_database(
database_id=self.outputs.database_id,
new_size=self.config.size_gb,
)
# Return updated outputs (or existing if unchanged)
return self.outputs
When called: Each time the resource configuration changes after initial creation.
Parameters:
previous_config: The configuration from the previous successful apply. Use this to detect what changed.
Return value: An instance of your Outputs class. Return self.outputs if nothing relevant changed.
Common Update Patterns
Immutable fields - Some fields cannot be changed in-place. Raise an error to force delete and recreate:
async def on_update(self, previous_config: SecretConfig) -> SecretOutputs:
if previous_config.project_id != self.config.project_id:
raise ValueError("Cannot change project_id; delete and recreate resource")
No-op when unchanged - Skip work if the relevant fields haven’t changed:
async def on_update(self, previous_config: SecretConfig) -> SecretOutputs:
# If data unchanged, return existing outputs
if previous_config.data == self.config.data:
return self.outputs
# Otherwise, perform the update
version = await self.client.add_version(data=self.config.data)
return SecretOutputs(version_id=version.id)
on_delete
Called when a resource is being removed. Clean up the underlying infrastructure.
async def on_delete(self) -> None:
try:
await self.client.delete_database(self.outputs.database_id)
except NotFoundError:
# Already deleted - this is fine (idempotent)
pass
When called: When a resource is removed from a graph or explicitly deleted.
Return value: None. Any return value is ignored.
Idempotency Requirements
All lifecycle methods must be idempotent. The same method may be called multiple times with the same input, and must produce the same result.
This happens when:
- The runtime restarts after executing a method but before confirming completion
- Network issues cause retries
- The user re-applies the same configuration
Idempotent Create
Handle the case where the resource already exists:
async def on_create(self) -> SecretOutputs:
try:
secret = await self.client.create_secret(self.config.secret_id)
except AlreadyExistsError:
# Resource exists from a previous attempt - get it instead
secret = await self.client.get_secret(self.config.secret_id)
return SecretOutputs(resource_name=secret.name)
Idempotent Delete
Handle the case where the resource is already gone:
async def on_delete(self) -> None:
try:
await self.client.delete_secret(self.config.secret_id)
except NotFoundError:
# Already deleted - success
pass
Idempotent Update
Return existing outputs when nothing needs to change:
async def on_update(self, previous_config: MyConfig) -> MyOutputs:
if previous_config == self.config:
return self.outputs # No changes needed
# Perform update...
Error Handling
Raise exceptions to signal failures. The runtime will:
- Mark the resource as
failed
- Record the error message
- Stop processing dependent resources
async def on_create(self) -> DatabaseOutputs:
try:
db = await self.client.create_database(self.config.name)
except QuotaExceededError as e:
raise RuntimeError(f"Quota exceeded: {e}")
except InvalidConfigError as e:
raise ValueError(f"Invalid configuration: {e}")
return DatabaseOutputs(connection_url=db.url)
Use specific exception types:
ValueError - Invalid configuration (user error)
RuntimeError - Infrastructure or API failures
- Let unexpected exceptions propagate with full tracebacks
Complete Example
from pragma_sdk import Resource, Config, Outputs, Provider
from mycloud import CloudClient
from mycloud.exceptions import AlreadyExists, NotFound
class DatabaseConfig(Config):
name: str
size_gb: int = 10
region: str = "us-east-1"
class DatabaseOutputs(Outputs):
database_id: str
connection_url: str
host: str
mycloud = Provider(name="mycloud")
@mycloud.resource("database")
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
def _get_client(self) -> CloudClient:
return CloudClient()
async def on_create(self) -> DatabaseOutputs:
client = self._get_client()
try:
db = client.create_database(
name=self.config.name,
size_gb=self.config.size_gb,
region=self.config.region,
)
except AlreadyExists:
# Idempotent: get existing if already created
db = client.get_database(self.config.name)
return DatabaseOutputs(
database_id=db.id,
connection_url=db.connection_string,
host=db.host,
)
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
# Immutable fields
if previous_config.name != self.config.name:
raise ValueError("Cannot rename database; delete and recreate")
if previous_config.region != self.config.region:
raise ValueError("Cannot change region; delete and recreate")
# Mutable field: size
if previous_config.size_gb != self.config.size_gb:
client = self._get_client()
client.resize_database(
database_id=self.outputs.database_id,
new_size_gb=self.config.size_gb,
)
return self.outputs
async def on_delete(self) -> None:
client = self._get_client()
try:
client.delete_database(self.outputs.database_id)
except NotFound:
# Idempotent: already deleted
pass
Creating Subresources
Providers can create and manage subresources from within lifecycle handlers. This enables composition patterns where a high-level resource provisions multiple lower-level resources.
The Pattern
async def on_create(self) -> AppOutputs:
# 1. Create the subresource instance
db = DatabaseResource(
name=f"{self.name}-db",
config=DatabaseConfig(size_gb=10),
)
# 2. Set ownership for cascading deletes
db.set_owner(self)
# 3. Apply (create) the subresource
await db.apply()
# 4. Wait for it to be ready
await db.wait_ready(timeout=120.0)
# 5. Use its outputs
return AppOutputs(db_url=db.outputs.connection_url)
Methods
set_owner(parent) - Establishes an ownership relationship. When the parent resource is deleted, owned subresources are automatically cleaned up via cascading deletes. Returns self for method chaining.
apply() - Sends the resource to the API for creation. The resource enters PENDING state and a lifecycle event is emitted for the appropriate provider to process. Returns self for method chaining.
wait_ready(timeout) - Waits for the resource to reach READY state. Updates the resource instance with outputs from the completed operation. Raises TimeoutError if the timeout is exceeded or ResourceFailedError if the resource fails.
Complete Example
Here’s a complete example of an application resource that provisions its own database:
from pragma_sdk import Resource, Config, Outputs, Provider
class AppConfig(Config):
environment: str = "production"
db_size_gb: int = 10
class AppOutputs(Outputs):
app_url: str
db_connection_url: str
class DatabaseConfig(Config):
size_gb: int
class DatabaseOutputs(Outputs):
connection_url: str
mycloud = Provider(name="mycloud")
@mycloud.resource("database")
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
async def on_create(self) -> DatabaseOutputs:
# Provision actual database...
return DatabaseOutputs(connection_url="postgres://...")
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
return self.outputs
async def on_delete(self) -> None:
pass
@mycloud.resource("app")
class App(Resource[AppConfig, AppOutputs]):
async def on_create(self) -> AppOutputs:
# Create a database subresource
db = Database(
name=f"{self.name}-db",
config=DatabaseConfig(size_gb=self.config.db_size_gb),
)
db.set_owner(self)
await db.apply()
await db.wait_ready(timeout=120.0)
# Deploy the application using the database
app_url = await self._deploy_app(db.outputs.connection_url)
return AppOutputs(
app_url=app_url,
db_connection_url=db.outputs.connection_url,
)
async def on_update(self, previous_config: AppConfig) -> AppOutputs:
# Subresource updates are handled automatically via dependencies
return self.outputs
async def on_delete(self) -> None:
# Subresources are deleted automatically via cascading deletes
await self._undeploy_app()
async def _deploy_app(self, db_url: str) -> str:
# Deploy logic here...
return f"https://{self.name}.example.com"
async def _undeploy_app(self) -> None:
# Cleanup logic here...
pass
When to Use Subresources
| Use Case | Approach |
|---|
| Encapsulated infrastructure (DB, cache, etc.) | Subresources with ownership |
| Shared resources across multiple parents | Dependencies without ownership |
| User-managed separate resources | Separate top-level resources |
Use subresources when the child resource’s lifecycle is tightly coupled to the parent. If users might want to manage the child independently, expose it as a separate resource type instead.
Testing Lifecycle Methods
Use the ProviderHarness to test your lifecycle methods without infrastructure:
from pragma_sdk.provider import ProviderHarness
async def test_create_database():
harness = ProviderHarness()
result = await harness.invoke_create(
Database,
name="test-db",
config=DatabaseConfig(name="test-db", size_gb=20),
)
assert result.success
assert result.outputs.database_id is not None
async def test_update_database():
harness = ProviderHarness()
# First create
create_result = await harness.invoke_create(
Database,
name="test-db",
config=DatabaseConfig(name="test-db", size_gb=10),
)
# Then update
update_result = await harness.invoke_update(
Database,
name="test-db",
config=DatabaseConfig(name="test-db", size_gb=20),
previous_config=DatabaseConfig(name="test-db", size_gb=10),
current_outputs=create_result.outputs,
)
assert update_result.success
async def test_delete_idempotent():
harness = ProviderHarness()
result = await harness.invoke_delete(
Database,
name="test-db",
config=DatabaseConfig(name="test-db"),
)
assert result.success
See the SDK documentation for more testing patterns.