Skip to main content
The Pragmatiks SDK provides Python clients for managing resources programmatically. It offers both synchronous and asynchronous clients, typed responses, and utilities for building providers.

Installation

uv add pragmatiks-sdk

Quick Start

from pragma_sdk import PragmaClient

# Client auto-discovers credentials from CLI login or environment
with PragmaClient() as client:
    # List all resources
    resources = client.list_resources()

    # Filter by provider
    gcp_resources = client.list_resources(provider="gcp")

    # Apply a resource
    result = client.apply_resource({
        "provider": "gcp",
        "resource": "storage",
        "name": "my-bucket",
        "config": {"location": "EU"}
    })

Authentication

The SDK discovers credentials in this order:
  1. Explicit token - Pass auth_token to the client
  2. Context-specific environment variable - PRAGMA_AUTH_TOKEN_{CONTEXT} (uppercase)
  3. Generic environment variable - PRAGMA_AUTH_TOKEN
  4. CLI credentials file - Tokens stored by pragma auth login
# Auto-discovery (recommended)
client = PragmaClient()

# Explicit token
client = PragmaClient(auth_token="your-token-here")

# Named context
client = PragmaClient(context="production")

# Require authentication (raises ValueError if no token found)
client = PragmaClient(require_auth=True)

# Disable authentication
client = PragmaClient(auth_token=None)

Contexts

Contexts allow switching between environments (e.g., development, staging, production):
# Uses PRAGMA_AUTH_TOKEN_PRODUCTION or credentials file
client = PragmaClient(context="production")
Set the default context via environment variable:
export PRAGMA_CONTEXT=production

PragmaClient

Synchronous client for the Pragmatiks API. Use as a context manager to ensure connections are closed.

Constructor

PragmaClient(
    base_url: str | None = None,      # API URL (default: PRAGMA_API_URL or localhost:8000)
    timeout: float = 10.0,            # Request timeout in seconds
    auth_token: str | None = ...,     # Bearer token (omit for auto-discovery)
    context: str | None = None,       # Named context for token lookup
    require_auth: bool = False,       # Raise if no token found
)

Context Manager

with PragmaClient() as client:
    resources = client.list_resources()
# Connection automatically closed

# Or manually close:
client = PragmaClient()
try:
    resources = client.list_resources()
finally:
    client.close()

Health Check

def is_healthy() -> bool
Check if the API is reachable and healthy.
with PragmaClient() as client:
    if client.is_healthy():
        print("API is available")
    else:
        print("API is unavailable")

Get Current User

def get_me() -> UserInfo
Get information about the authenticated user.
from pragma_sdk import PragmaClient, UserInfo

with PragmaClient() as client:
    user = client.get_me()
    print(f"User: {user.email}")
    print(f"Organization: {user.organization_name}")
Returns: UserInfo with user_id, email, organization_id, and organization_name. Raises: httpx.HTTPStatusError if not authenticated (401).

Resource Operations

list_resources

def list_resources(
    provider: str | None = None,      # Filter by provider name
    resource: str | None = None,      # Filter by resource type
    tags: list[str] | None = None,    # Filter by tags (must match all)
    *,
    model: type[ResourceT] | None = None,  # Typed response class
) -> list[ResourceT] | list[dict[str, Any]]
List resources with optional filters. Returns dictionaries by default, or typed instances if model is provided.
# List all resources as dictionaries
resources = client.list_resources()

# Filter by provider
gcp_resources = client.list_resources(provider="gcp")

# Filter by resource type
warehouses = client.list_resources(provider="gcp", resource="warehouse")

# Filter by tags
production = client.list_resources(tags=["env:production"])

# Typed response (see "Typed Responses" section)
from myapp.models import Database
databases = client.list_resources(provider="mycloud", resource="database", model=Database)

list_resource_types

def list_resource_types(
    provider: str | None = None,  # Filter by provider
) -> list[dict[str, Any]]
List available resource types from deployed providers.
# List all resource types
types = client.list_resource_types()

for rt in types:
    print(f"{rt['provider']}/{rt['resource']}: {rt['description']}")

# Filter by provider
gcp_types = client.list_resource_types(provider="gcp")
Returns: List of resource definitions with provider, resource, schema, and description.

get_resource

def get_resource(
    provider: str,                    # Provider that manages the resource
    resource: str,                    # Resource type name
    name: str,                        # Resource instance name
    *,
    model: type[ResourceT] | None = None,  # Typed response class
) -> ResourceT | dict[str, Any]
Get a single resource by its identifier.
# Get as dictionary
bucket = client.get_resource("gcp", "storage", "my-bucket")
print(bucket["lifecycle_state"])  # "ready"

# Typed response
from myapp.models import Bucket
bucket = client.get_resource("gcp", "storage", "my-bucket", model=Bucket)
print(bucket.lifecycle_state)  # LifecycleState.READY
Raises: httpx.HTTPStatusError if resource not found (404).

apply_resource

def apply_resource(
    resource: ResourceT | dict[str, Any],  # Resource to create or update
    *,
    model: type[ResourceT] | None = None,  # Typed response class
) -> ResourceT | dict[str, Any]
Create a new resource or update an existing one.
# Apply from dictionary
result = client.apply_resource({
    "provider": "gcp",
    "resource": "storage",
    "name": "my-bucket",
    "config": {"location": "EU", "storage_class": "STANDARD"},
    "tags": ["env:production"],
})

# Apply typed resource
from myapp.models import Bucket, BucketConfig
bucket = Bucket(
    name="my-bucket",
    config=BucketConfig(location="EU", storage_class="STANDARD"),
    tags=["env:production"],
)
result = client.apply_resource(bucket, model=Bucket)

delete_resource

def delete_resource(
    provider: str,
    resource: str,
    name: str,
) -> None
Delete a resource. The resource enters the deletion lifecycle and is removed after cleanup.
client.delete_resource("gcp", "storage", "my-bucket")
Raises: httpx.HTTPStatusError if resource not found (404).

Dead Letter Operations

Dead letter events are failed lifecycle events that can be retried or discarded.

list_dead_letter_events

def list_dead_letter_events(
    provider: str | None = None,  # Filter by provider
) -> list[dict[str, Any]]
List failed events awaiting manual intervention.
# List all dead letter events
events = client.list_dead_letter_events()

# Filter by provider
gcp_events = client.list_dead_letter_events(provider="gcp")

for event in events:
    print(f"{event['resource_id']}: {event['error']}")

get_dead_letter_event

def get_dead_letter_event(event_id: str) -> dict[str, Any]
Get details of a specific dead letter event.
event = client.get_dead_letter_event("evt_abc123")
print(event["error"])
print(event["payload"])

retry_dead_letter_event

def retry_dead_letter_event(event_id: str) -> None
Retry a single failed event.
client.retry_dead_letter_event("evt_abc123")

retry_all_dead_letter_events

def retry_all_dead_letter_events() -> int
Retry all dead letter events. Returns the count of events retried.
count = client.retry_all_dead_letter_events()
print(f"Retried {count} events")

delete_dead_letter_event

def delete_dead_letter_event(event_id: str) -> None
Discard a dead letter event without retrying.
client.delete_dead_letter_event("evt_abc123")

delete_dead_letter_events

def delete_dead_letter_events(
    provider: str | None = None,  # Delete for this provider only
    *,
    all: bool = False,            # Delete all events
) -> int
Bulk delete dead letter events. Must specify either provider or all=True.
# Delete all events for a provider
count = client.delete_dead_letter_events(provider="gcp")

# Delete all events
count = client.delete_dead_letter_events(all=True)
Raises: ValueError if neither provider nor all=True is specified.

Provider Deployment

Deploy custom providers to the platform.

list_providers

def list_providers() -> list[ProviderInfo]
List all providers for the current tenant.
from pragma_sdk import PragmaClient

with PragmaClient() as client:
    providers = client.list_providers()

    for p in providers:
        print(f"{p.provider_id}: {p.deployment_status or 'not deployed'}")
        if p.current_version:
            print(f"  Version: {p.current_version}")
Returns: List of ProviderInfo with provider_id, current_version, deployment_status, and updated_at.

list_builds

def list_builds(provider_id: str) -> list[BuildInfo]
List builds for a provider (newest first, up to 10).
from pragma_sdk import BuildStatus

builds = client.list_builds("my-provider")

for build in builds:
    print(f"{build.version}: {build.status}")
    if build.status == BuildStatus.FAILED:
        print(f"  Error: {build.error_message}")
Returns: List of BuildInfo with provider_id, version, status, error_message, and created_at.

push_provider

def push_provider(
    provider_id: str,   # Provider identifier
    tarball: bytes,     # Gzipped tarball of source code
) -> PushResult
Upload provider source code and start a build.
import tarfile
import io

# Create tarball from provider directory
buffer = io.BytesIO()
with tarfile.open(fileobj=buffer, mode="w:gz") as tar:
    tar.add("./my-provider", arcname=".")
tarball = buffer.getvalue()

# Push and start build
result = client.push_provider("my-provider", tarball)
print(f"Build version: {result.version}")
print(f"Status: {result.status}")  # BuildStatus.PENDING
Returns: PushResult with version, status, and message.

get_build_status

def get_build_status(
    provider_id: str,
    version: str,        # CalVer version (e.g., "20250115.120000")
) -> BuildInfo
Check the status of a build by version.
from pragma_sdk import BuildStatus

result = client.get_build_status("my-provider", "20250115.120000")

if result.status == BuildStatus.SUCCESS:
    print("Build successful!")
elif result.status == BuildStatus.FAILED:
    print(f"Build failed: {result.error_message}")
elif result.status == BuildStatus.BUILDING:
    print("Build in progress...")
Returns: BuildInfo with provider_id, version, status, error_message (on failure), and created_at.

stream_build_logs

def stream_build_logs(
    provider_id: str,
    version: str,        # CalVer version (e.g., "20250115.120000")
) -> ContextManager[httpx.Response]
Stream build logs in real-time.
with client.stream_build_logs("my-provider", "20250115.120000") as response:
    for line in response.iter_lines():
        print(line)

deploy_provider

def deploy_provider(
    provider_id: str,
    version: str | None = None,  # CalVer version (e.g., "20250115.120000")
) -> ProviderStatus
Deploy a provider to a specific version. If no version is specified, deploys the latest successful build.
from pragma_sdk import DeploymentStatus

# Deploy latest version
result = client.deploy_provider("my-provider")
print(f"Version: {result.version}")
print(f"Status: {result.status}")  # DeploymentStatus.PROGRESSING
print(f"Healthy: {result.healthy}")

# Deploy specific version (useful for rollbacks)
result = client.deploy_provider("my-provider", version="20250114.120000")
Returns: ProviderStatus with status, version, updated_at, and healthy.

get_deployment_status

def get_deployment_status(provider_id: str) -> ProviderStatus
Check deployment status.
from pragma_sdk import DeploymentStatus

result = client.get_deployment_status("my-provider")

if result.status == DeploymentStatus.AVAILABLE:
    print(f"Provider is healthy: {result.healthy}")
    print(f"Version: {result.version}")
elif result.status == DeploymentStatus.FAILED:
    print("Deployment failed")
Returns: ProviderStatus with status, version, updated_at, and healthy.

AsyncPragmaClient

Asynchronous client with the same API as PragmaClient. All methods are async.
import asyncio
from pragma_sdk import AsyncPragmaClient

async def main():
    async with AsyncPragmaClient() as client:
        # All methods are awaited
        resources = await client.list_resources(provider="gcp")

        bucket = await client.get_resource("gcp", "storage", "my-bucket")

        result = await client.apply_resource({
            "provider": "gcp",
            "resource": "storage",
            "name": "new-bucket",
            "config": {"location": "EU"},
        })

asyncio.run(main())

Async Context Manager

async with AsyncPragmaClient() as client:
    resources = await client.list_resources()
# Connection automatically closed

# Or manually close:
client = AsyncPragmaClient()
try:
    resources = await client.list_resources()
finally:
    await client.close()

Streaming Build Logs (Async)

async with client.stream_build_logs("my-provider", "20250115.120000") as response:
    async for line in response.aiter_lines():
        print(line)

Concurrent Operations

Run multiple independent operations concurrently:
import asyncio

async def fetch_all_resources():
    async with AsyncPragmaClient() as client:
        # Fetch from multiple providers concurrently
        gcp, aws, azure = await asyncio.gather(
            client.list_resources(provider="gcp"),
            client.list_resources(provider="aws"),
            client.list_resources(provider="azure"),
        )
        return gcp + aws + azure

Typed Responses

Use typed models for better IDE support and validation. Define models using the SDK base classes.

Resource Model

from pragma_sdk import Resource, Config, Outputs, LifecycleState

class BucketConfig(Config):
    location: str
    storage_class: str = "STANDARD"

class BucketOutputs(Outputs):
    url: str
    created_at: str

class Bucket(Resource[BucketConfig, BucketOutputs]):
    provider = "gcp"
    resource = "storage"

Using Typed Models

# Get with typed response
bucket = client.get_resource("gcp", "storage", "my-bucket", model=Bucket)
print(bucket.config.location)      # "EU"
print(bucket.outputs.url)          # "gs://my-bucket"
print(bucket.lifecycle_state)      # LifecycleState.READY

# List with typed response
buckets = client.list_resources(provider="gcp", resource="storage", model=Bucket)
for bucket in buckets:
    print(f"{bucket.name}: {bucket.config.location}")

# Apply typed resource
new_bucket = Bucket(
    name="new-bucket",
    config=BucketConfig(location="US"),
)
result = client.apply_resource(new_bucket, model=Bucket)

Lifecycle State Enum

from pragma_sdk import LifecycleState

resource = client.get_resource("gcp", "storage", "my-bucket")

match resource["lifecycle_state"]:
    case LifecycleState.DRAFT:
        print("Not yet submitted")
    case LifecycleState.PENDING:
        print("Queued for processing")
    case LifecycleState.PROCESSING:
        print("Being created/updated/deleted")
    case LifecycleState.READY:
        print("Successfully provisioned")
    case LifecycleState.FAILED:
        print(f"Error: {resource['error']}")

Dependencies

When building providers, you can declare dependencies on other resources using the Dependency[T] generic. This provides typed access to the resolved dependency’s outputs.
from pragma_sdk import Resource, Config, Outputs, Dependency
from my_provider.resources import Database

class AppConfig(Config):
    # dependency: declares a dependency on a Database resource
    database: Dependency[Database]

@mycloud.resource("app")
class App(Resource[AppConfig, AppOutputs]):
    async def on_create(self) -> AppOutputs:
        # Access the resolved dependency
        db = self.config.database
        
        # Access outputs (typed)
        conn_url = db.outputs.connection_url
        
        # Access config
        db_size = db.config.size_gb
        
        return AppOutputs(url=f"Connected to {conn_url}")

Error Handling

The SDK raises httpx.HTTPStatusError for API errors. Handle common cases:
import httpx
from pragma_sdk import PragmaClient

with PragmaClient() as client:
    try:
        resource = client.get_resource("gcp", "storage", "nonexistent")
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            print("Resource not found")
        elif e.response.status_code == 401:
            print("Authentication failed")
        elif e.response.status_code == 403:
            print("Permission denied")
        else:
            print(f"API error: {e.response.status_code}")
            raise

Authentication Errors

from pragma_sdk import PragmaClient

try:
    # Require authentication
    client = PragmaClient(require_auth=True)
except ValueError as e:
    print(f"No credentials found: {e}")
    print("Run 'pragma auth login' or set PRAGMA_AUTH_TOKEN")

Timeout Handling

import httpx

with PragmaClient(timeout=30.0) as client:
    try:
        result = client.apply_resource(large_resource)
    except httpx.TimeoutException:
        print("Request timed out - try increasing timeout")

Retry Pattern

import time
import httpx

def apply_with_retry(client, resource, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.apply_resource(resource)
        except httpx.HTTPStatusError as e:
            if e.response.status_code >= 500 and attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
                continue
            raise

Provider Authoring

The SDK also provides utilities for building providers. See Building Providers for complete documentation.

Key Exports

from pragma_sdk import (
    # Base classes for resources
    Resource,
    Config,
    Outputs,

    # Provider decorator
    Provider,

    # Enums and types
    LifecycleState,
    EventType,
    ResponseStatus,

    # Field references for dependencies
    Field,
    FieldReference,
    ResourceReference,
)

from pragma_sdk.provider import (
    # Testing harness
    ProviderHarness,
    LifecycleEvent,
    LifecycleResult,
)

Quick Example

from pragma_sdk import Provider, Resource, Config, Outputs

mycloud = Provider(name="mycloud")

class DatabaseConfig(Config):
    name: str
    size_gb: int = 10

class DatabaseOutputs(Outputs):
    connection_url: str

@mycloud.resource("database")
class Database(Resource[DatabaseConfig, DatabaseOutputs]):
    async def on_create(self) -> DatabaseOutputs:
        # Provision database
        return DatabaseOutputs(connection_url=f"postgres://.../{self.config.name}")

    async def on_update(self, previous_config: DatabaseConfig) -> DatabaseOutputs:
        # Handle updates
        return self.outputs

    async def on_delete(self) -> None:
        # Clean up database
        pass

Next Steps