Skip to main content
Every resource must implement three lifecycle methods that Pragmatiks calls when managing your resources. These methods define how your resource is created, updated, and deleted.

Method Signatures

from pragma_sdk import Resource, Config, Outputs

class MyResource(Resource[MyConfig, MyOutputs]):
    async def on_create(self) -> MyOutputs:
        """Called when creating a new resource."""
        ...

    async def on_update(self, previous_config: MyConfig) -> MyOutputs:
        """Called when updating an existing resource."""
        ...

    async def on_delete(self) -> None:
        """Called when deleting a resource."""
        ...
All lifecycle methods are async and run within the provider runtime.

Instance Attributes

Inside lifecycle methods, you have access to several instance attributes:
AttributeTypeDescription
self.namestrResource instance name
self.configConfigTCurrent configuration
self.outputsOutputsT | NoneExisting outputs (available in on_update and on_delete)
self.tagslist[str] | NoneTags attached to the resource

on_create

Called when a resource is first created. Your implementation should provision the underlying infrastructure and return the outputs.
async def on_create(self) -> DatabaseOutputs:
    # Provision the actual resource
    db = await self.client.create_database(
        name=self.config.name,
        size=self.config.size_gb,
    )

    # Return outputs for dependent resources
    return DatabaseOutputs(
        connection_url=db.connection_string,
        database_id=db.id,
    )
When called: The first time a resource is applied. Return value: An instance of your Outputs class containing values that other resources can reference.

on_update

Called when a resource configuration changes. Receives the previous configuration so you can determine what changed and respond appropriately.
async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
    # Detect immutable field changes
    if previous_config.name != self.config.name:
        raise ValueError("Cannot rename database; delete and recreate")

    # Handle mutable field changes
    if previous_config.size_gb != self.config.size_gb:
        await self.client.resize_database(
            database_id=self.outputs.database_id,
            new_size=self.config.size_gb,
        )

    # Return updated outputs (or existing if unchanged)
    return self.outputs
When called: Each time the resource configuration changes after initial creation. Parameters:
  • previous_config: The configuration from the previous successful apply. Use this to detect what changed.
Return value: An instance of your Outputs class. Return self.outputs if nothing relevant changed.

Common Update Patterns

Immutable fields - Some fields cannot be changed in-place. Raise an error to force delete and recreate:
async def on_update(self, previous_config: SecretConfig) -> SecretOutputs:
    if previous_config.project_id != self.config.project_id:
        raise ValueError("Cannot change project_id; delete and recreate resource")
No-op when unchanged - Skip work if the relevant fields haven’t changed:
async def on_update(self, previous_config: SecretConfig) -> SecretOutputs:
    # If data unchanged, return existing outputs
    if previous_config.data == self.config.data:
        return self.outputs

    # Otherwise, perform the update
    version = await self.client.add_version(data=self.config.data)
    return SecretOutputs(version_id=version.id)

on_delete

Called when a resource is being removed. Clean up the underlying infrastructure.
async def on_delete(self) -> None:
    try:
        await self.client.delete_database(self.outputs.database_id)
    except NotFoundError:
        # Already deleted - this is fine (idempotent)
        pass
When called: When a resource is removed from a graph or explicitly deleted. Return value: None. Any return value is ignored.

Idempotency Requirements

All lifecycle methods must be idempotent. The same method may be called multiple times with the same input, and must produce the same result.
This happens when:
  • The runtime restarts after executing a method but before confirming completion
  • Network issues cause retries
  • The user re-applies the same configuration

Idempotent Create

Handle the case where the resource already exists:
async def on_create(self) -> SecretOutputs:
    try:
        secret = await self.client.create_secret(self.config.secret_id)
    except AlreadyExistsError:
        # Resource exists from a previous attempt - get it instead
        secret = await self.client.get_secret(self.config.secret_id)

    return SecretOutputs(resource_name=secret.name)

Idempotent Delete

Handle the case where the resource is already gone:
async def on_delete(self) -> None:
    try:
        await self.client.delete_secret(self.config.secret_id)
    except NotFoundError:
        # Already deleted - success
        pass

Idempotent Update

Return existing outputs when nothing needs to change:
async def on_update(self, previous_config: MyConfig) -> MyOutputs:
    if previous_config == self.config:
        return self.outputs  # No changes needed

    # Perform update...

Error Handling

Raise exceptions to signal failures. The runtime will:
  1. Mark the resource as failed
  2. Record the error message
  3. Stop processing dependent resources
async def on_create(self) -> DatabaseOutputs:
    try:
        db = await self.client.create_database(self.config.name)
    except QuotaExceededError as e:
        raise RuntimeError(f"Quota exceeded: {e}")
    except InvalidConfigError as e:
        raise ValueError(f"Invalid configuration: {e}")

    return DatabaseOutputs(connection_url=db.url)
Use specific exception types:
  • ValueError - Invalid configuration (user error)
  • RuntimeError - Infrastructure or API failures
  • Let unexpected exceptions propagate with full tracebacks

Complete Example

from pragma_sdk import Resource, Config, Outputs, Provider
from mycloud import CloudClient
from mycloud.exceptions import AlreadyExists, NotFound


class DatabaseConfig(Config):
    name: str
    size_gb: int = 10
    region: str = "us-east-1"


class DatabaseOutputs(Outputs):
    database_id: str
    connection_url: str
    host: str


mycloud = Provider(name="mycloud")


@mycloud.resource("database")
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    def _get_client(self) -> CloudClient:
        return CloudClient()

    async def on_create(self) -> DatabaseOutputs:
        client = self._get_client()

        try:
            db = client.create_database(
                name=self.config.name,
                size_gb=self.config.size_gb,
                region=self.config.region,
            )
        except AlreadyExists:
            # Idempotent: get existing if already created
            db = client.get_database(self.config.name)

        return DatabaseOutputs(
            database_id=db.id,
            connection_url=db.connection_string,
            host=db.host,
        )

    async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
        # Immutable fields
        if previous_config.name != self.config.name:
            raise ValueError("Cannot rename database; delete and recreate")
        if previous_config.region != self.config.region:
            raise ValueError("Cannot change region; delete and recreate")

        # Mutable field: size
        if previous_config.size_gb != self.config.size_gb:
            client = self._get_client()
            client.resize_database(
                database_id=self.outputs.database_id,
                new_size_gb=self.config.size_gb,
            )

        return self.outputs

    async def on_delete(self) -> None:
        client = self._get_client()

        try:
            client.delete_database(self.outputs.database_id)
        except NotFound:
            # Idempotent: already deleted
            pass

Creating Subresources

Providers can create and manage subresources from within lifecycle handlers. This enables composition patterns where a high-level resource provisions multiple lower-level resources.

The Pattern

async def on_create(self) -> AppOutputs:
    # 1. Create the subresource instance
    db = DatabaseResource(
        name=f"{self.name}-db",
        config=DatabaseConfig(size_gb=10),
    )

    # 2. Set ownership for cascading deletes
    db.set_owner(self)

    # 3. Apply (create) the subresource
    await db.apply()

    # 4. Wait for it to be ready
    await db.wait_ready(timeout=120.0)

    # 5. Use its outputs
    return AppOutputs(db_url=db.outputs.connection_url)

Methods

set_owner(parent) - Establishes an ownership relationship. When the parent resource is deleted, owned subresources are automatically cleaned up via cascading deletes. Returns self for method chaining. apply() - Sends the resource to the API for creation. The resource enters PENDING state and a lifecycle event is emitted for the appropriate provider to process. Returns self for method chaining. wait_ready(timeout) - Waits for the resource to reach READY state. Updates the resource instance with outputs from the completed operation. Raises TimeoutError if the timeout is exceeded or ResourceFailedError if the resource fails.

Complete Example

Here’s a complete example of an application resource that provisions its own database:
from pragma_sdk import Resource, Config, Outputs, Provider


class AppConfig(Config):
    environment: str = "production"
    db_size_gb: int = 10


class AppOutputs(Outputs):
    app_url: str
    db_connection_url: str


class DatabaseConfig(Config):
    size_gb: int


class DatabaseOutputs(Outputs):
    connection_url: str


mycloud = Provider(name="mycloud")


@mycloud.resource("database")
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    async def on_create(self) -> DatabaseOutputs:
        # Provision actual database...
        return DatabaseOutputs(connection_url="postgres://...")

    async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
        return self.outputs

    async def on_delete(self) -> None:
        pass


@mycloud.resource("app")
class App(Resource[AppConfig, AppOutputs]):
    async def on_create(self) -> AppOutputs:
        # Create a database subresource
        db = Database(
            name=f"{self.name}-db",
            config=DatabaseConfig(size_gb=self.config.db_size_gb),
        )
        db.set_owner(self)
        await db.apply()
        await db.wait_ready(timeout=120.0)

        # Deploy the application using the database
        app_url = await self._deploy_app(db.outputs.connection_url)

        return AppOutputs(
            app_url=app_url,
            db_connection_url=db.outputs.connection_url,
        )

    async def on_update(self, previous_config: AppConfig) -> AppOutputs:
        # Subresource updates are handled automatically via dependencies
        return self.outputs

    async def on_delete(self) -> None:
        # Subresources are deleted automatically via cascading deletes
        await self._undeploy_app()

    async def _deploy_app(self, db_url: str) -> str:
        # Deploy logic here...
        return f"https://{self.name}.example.com"

    async def _undeploy_app(self) -> None:
        # Cleanup logic here...
        pass

When to Use Subresources

Use CaseApproach
Encapsulated infrastructure (DB, cache, etc.)Subresources with ownership
Shared resources across multiple parentsDependencies without ownership
User-managed separate resourcesSeparate top-level resources
Use subresources when the child resource’s lifecycle is tightly coupled to the parent. If users might want to manage the child independently, expose it as a separate resource type instead.

Testing Lifecycle Methods

Use the ProviderHarness to test your lifecycle methods without infrastructure:
from pragma_sdk.provider import ProviderHarness

async def test_create_database():
    harness = ProviderHarness()

    result = await harness.invoke_create(
        Database,
        name="test-db",
        config=DatabaseConfig(name="test-db", size_gb=20),
    )

    assert result.success
    assert result.outputs.database_id is not None


async def test_update_database():
    harness = ProviderHarness()

    # First create
    create_result = await harness.invoke_create(
        Database,
        name="test-db",
        config=DatabaseConfig(name="test-db", size_gb=10),
    )

    # Then update
    update_result = await harness.invoke_update(
        Database,
        name="test-db",
        config=DatabaseConfig(name="test-db", size_gb=20),
        previous_config=DatabaseConfig(name="test-db", size_gb=10),
        current_outputs=create_result.outputs,
    )

    assert update_result.success


async def test_delete_idempotent():
    harness = ProviderHarness()

    result = await harness.invoke_delete(
        Database,
        name="test-db",
        config=DatabaseConfig(name="test-db"),
    )

    assert result.success
See the SDK documentation for more testing patterns.