Documentation Index Fetch the complete documentation index at: https://docs.pragmatiks.io/llms.txt
Use this file to discover all available pages before exploring further.
The Pragmatiks SDK provides Python clients for managing resources programmatically. It offers both synchronous and asynchronous clients, typed responses, and utilities for building providers.
Installation
pip install pragmatiks-sdk
Quick Start
from pragma_sdk import PragmaClient
# Client auto-discovers credentials from CLI login or environment
with PragmaClient () as client :
# List all resources
resources = client . list_resources ()
# Filter by provider
gcp_resources = client . list_resources (provider = "gcp" )
# Apply a resource
result = client . apply_resource ({
"provider" : "gcp" ,
"resource" : "storage" ,
"name" : "my-bucket" ,
"config" : { "location" : "EU" }
})
Authentication
The SDK discovers credentials in this order:
Explicit token - Pass auth_token to the client
Context-specific environment variable - PRAGMA_AUTH_TOKEN_{CONTEXT} (uppercase)
Generic environment variable - PRAGMA_AUTH_TOKEN
CLI credentials file - Tokens stored by pragma auth login
# Auto-discovery (recommended)
client = PragmaClient ()
# Explicit token
client = PragmaClient (auth_token = "your-token-here" )
# Named context
client = PragmaClient (context = "production" )
# Require authentication (raises ValueError if no token found)
client = PragmaClient (require_auth = True )
# Disable authentication
client = PragmaClient (auth_token = None )
Contexts
Contexts allow switching between environments (e.g., development, staging, production):
# Uses PRAGMA_AUTH_TOKEN_PRODUCTION or credentials file
client = PragmaClient (context = "production" )
Set the default context via environment variable:
export PRAGMA_CONTEXT = production
PragmaClient
Synchronous client for the Pragmatiks API. Use as a context manager to ensure connections are closed.
Constructor
PragmaClient (
base_url: str | None = None , # API URL (default: PRAGMA_API_URL or localhost:8000)
timeout: float = 10.0 , # Request timeout in seconds
auth_token: str | None = ..., # Bearer token (omit for auto-discovery)
context: str | None = None , # Named context for token lookup
require_auth: bool = False , # Raise if no token found
)
Context Manager
with PragmaClient () as client :
resources = client . list_resources ()
# Connection automatically closed
# Or manually close:
client = PragmaClient ()
try :
resources = client . list_resources ()
finally :
client . close ()
Health Check
Check if the API is reachable and healthy.
with PragmaClient () as client :
if client . is_healthy ():
print ( "API is available" )
else :
print ( "API is unavailable" )
Get Current User
Get information about the authenticated user.
from pragma_sdk import PragmaClient , UserInfo
with PragmaClient () as client :
user = client . get_me ()
print ( f "User: { user.email } " )
print ( f "Organization: { user.organization_name } " )
Returns: UserInfo with user_id, email, organization_id, and organization_name.
Raises: httpx.HTTPStatusError if not authenticated (401).
Resource Operations
list_resources
def list_resources (
provider : str | None = None , # Filter by provider name
resource : str | None = None , # Filter by resource type
tags : list [ str ] | None = None , # Filter by tags (must match all)
* ,
model : type [ ResourceT ] | None = None , # Typed response class
) -> list [ ResourceT ] | list [ dict [ str , Any ]]
List resources with optional filters. Returns dictionaries by default, or typed instances if model is provided.
# List all resources as dictionaries
resources = client . list_resources ()
# Filter by provider
gcp_resources = client . list_resources (provider = "gcp" )
# Filter by resource type
warehouses = client . list_resources (provider = "gcp" , resource = "warehouse" )
# Filter by tags
production = client . list_resources (tags = [ "env:production" ])
# Typed response (see "Typed Responses" section)
from myapp . models import Database
databases = client . list_resources (provider = "mycloud" , resource = "database" , model = Database)
list_resource_types
def list_resource_types (
provider : str | None = None , # Filter by provider
) -> list [ dict [ str , Any ]]
List available resource types from deployed providers.
# List all resource types
types = client . list_resource_types ()
for rt in types :
print ( f " { rt[ 'provider' ] } / { rt[ 'resource' ] } : { rt[ 'description' ] } " )
# Filter by provider
gcp_types = client . list_resource_types (provider = "gcp" )
Returns: List of resource definitions with provider, resource, schema, and description.
get_resource
def get_resource (
provider : str , # Provider that manages the resource
resource : str , # Resource type name
name : str , # Resource instance name
* ,
model : type [ ResourceT ] | None = None , # Typed response class
) -> ResourceT | dict [ str , Any ]
Get a single resource by its identifier.
# Get as dictionary
bucket = client . get_resource ( "gcp" , "storage" , "my-bucket" )
print (bucket[ "lifecycle_state" ]) # "ready"
# Typed response
from myapp . models import Bucket
bucket = client . get_resource ( "gcp" , "storage" , "my-bucket" , model = Bucket)
print (bucket.lifecycle_state) # LifecycleState.READY
Raises: httpx.HTTPStatusError if resource not found (404).
apply_resource
def apply_resource (
resource : ResourceT | dict [ str , Any ], # Resource to create or update
* ,
model : type [ ResourceT ] | None = None , # Typed response class
) -> ResourceT | dict [ str , Any ]
Create a new resource or update an existing one.
# Apply from dictionary
result = client . apply_resource ({
"provider" : "gcp" ,
"resource" : "storage" ,
"name" : "my-bucket" ,
"config" : { "location" : "EU" , "storage_class" : "STANDARD" },
"tags" : [ "env:production" ],
})
# Apply typed resource
from myapp . models import Bucket , BucketConfig
bucket = Bucket (
name = "my-bucket" ,
config = BucketConfig (location = "EU" , storage_class = "STANDARD" ),
tags = [ "env:production" ],
)
result = client . apply_resource (bucket, model = Bucket)
delete_resource
def delete_resource (
provider : str ,
resource : str ,
name : str ,
) -> None
Delete a resource. The resource enters the deletion lifecycle and is removed after cleanup.
client . delete_resource ( "gcp" , "storage" , "my-bucket" )
Raises: httpx.HTTPStatusError if resource not found (404).
Dead Letter Operations
Dead letter events are failed lifecycle events that can be retried or discarded.
list_dead_letter_events
def list_dead_letter_events (
provider : str | None = None , # Filter by provider
) -> list [ dict [ str , Any ]]
List failed events awaiting manual intervention.
# List all dead letter events
events = client . list_dead_letter_events ()
# Filter by provider
gcp_events = client . list_dead_letter_events (provider = "gcp" )
for event in events :
print ( f " { event[ 'resource_id' ] } : { event[ 'error' ] } " )
get_dead_letter_event
def get_dead_letter_event ( event_id : str ) -> dict [ str , Any ]
Get details of a specific dead letter event.
event = client . get_dead_letter_event ( "evt_abc123" )
print (event[ "error" ])
print (event[ "payload" ])
retry_dead_letter_event
def retry_dead_letter_event ( event_id : str ) -> None
Retry a single failed event.
client . retry_dead_letter_event ( "evt_abc123" )
retry_all_dead_letter_events
def retry_all_dead_letter_events () -> int
Retry all dead letter events. Returns the count of events retried.
count = client . retry_all_dead_letter_events ()
print ( f "Retried { count } events" )
delete_dead_letter_event
def delete_dead_letter_event ( event_id : str ) -> None
Discard a dead letter event without retrying.
client . delete_dead_letter_event ( "evt_abc123" )
delete_dead_letter_events
def delete_dead_letter_events (
provider : str | None = None , # Delete for this provider only
* ,
all : bool = False , # Delete all events
) -> int
Bulk delete dead letter events. Must specify either provider or all=True.
# Delete all events for a provider
count = client . delete_dead_letter_events (provider = "gcp" )
# Delete all events
count = client . delete_dead_letter_events (all = True )
Raises: ValueError if neither provider nor all=True is specified.
Provider Deployment
Deploy custom providers to the platform.
list_providers
def list_providers () -> list [ ProviderInfo ]
List all providers for the current tenant.
from pragma_sdk import PragmaClient
with PragmaClient () as client :
providers = client . list_providers ()
for p in providers :
print ( f " { p.provider_id } : { p.deployment_status or 'not deployed' } " )
if p . current_version :
print ( f " Version: { p.current_version } " )
Returns: List of ProviderInfo with provider_id, current_version, deployment_status, and updated_at.
list_builds
def list_builds ( provider_id : str ) -> list [ BuildInfo ]
List builds for a provider (newest first, up to 10).
from pragma_sdk import BuildStatus
builds = client . list_builds ( "my-provider" )
for build in builds :
print ( f " { build.version } : { build.status } " )
if build . status == BuildStatus . FAILED :
print ( f " Error: { build.error_message } " )
Returns: List of BuildInfo with provider_id, version, status, error_message, and created_at.
push_provider
def push_provider (
provider_id : str , # Provider identifier
tarball : bytes , # Gzipped tarball of source code
) -> PushResult
Upload provider source code and start a build.
import tarfile
import io
# Create tarball from provider directory
buffer = io . BytesIO ()
with tarfile . open (fileobj = buffer, mode = "w:gz" ) as tar :
tar . add ( "./my-provider" , arcname = "." )
tarball = buffer . getvalue ()
# Push and start build
result = client . push_provider ( "my-provider" , tarball)
print ( f "Build version: { result.version } " )
print ( f "Status: { result.status } " ) # BuildStatus.PENDING
Returns: PushResult with version, status, and message.
get_build_status
def get_build_status (
provider_id : str ,
version : str , # CalVer version (e.g., "20250115.120000")
) -> BuildInfo
Check the status of a build by version.
from pragma_sdk import BuildStatus
result = client . get_build_status ( "my-provider" , "20250115.120000" )
if result . status == BuildStatus . SUCCESS :
print ( "Build successful!" )
elif result . status == BuildStatus . FAILED :
print ( f "Build failed: { result.error_message } " )
elif result . status == BuildStatus . BUILDING :
print ( "Build in progress..." )
Returns: BuildInfo with provider_id, version, status, error_message (on failure), and created_at.
stream_build_logs
def stream_build_logs (
provider_id : str ,
version : str , # CalVer version (e.g., "20250115.120000")
) -> ContextManager [ httpx . Response ]
Stream build logs in real-time.
with client . stream_build_logs ( "my-provider" , "20250115.120000" ) as response :
for line in response . iter_lines ():
print (line)
deploy_provider
def deploy_provider (
provider_id : str ,
version : str | None = None , # CalVer version (e.g., "20250115.120000")
) -> ProviderStatus
Deploy a provider to a specific version. If no version is specified, deploys the latest successful build.
from pragma_sdk import DeploymentStatus
# Deploy latest version
result = client . deploy_provider ( "my-provider" )
print ( f "Version: { result.version } " )
print ( f "Status: { result.status } " ) # DeploymentStatus.PROGRESSING
print ( f "Healthy: { result.healthy } " )
# Deploy specific version (useful for rollbacks)
result = client . deploy_provider ( "my-provider" , version = "20250114.120000" )
Returns: ProviderStatus with status, version, updated_at, and healthy.
get_deployment_status
def get_deployment_status ( provider_id : str ) -> ProviderStatus
Check deployment status.
from pragma_sdk import DeploymentStatus
result = client . get_deployment_status ( "my-provider" )
if result . status == DeploymentStatus . AVAILABLE :
print ( f "Provider is healthy: { result.healthy } " )
print ( f "Version: { result.version } " )
elif result . status == DeploymentStatus . FAILED :
print ( "Deployment failed" )
Returns: ProviderStatus with status, version, updated_at, and healthy.
AsyncPragmaClient
Asynchronous client with the same API as PragmaClient. All methods are async.
import asyncio
from pragma_sdk import AsyncPragmaClient
async def main ():
async with AsyncPragmaClient () as client :
# All methods are awaited
resources = await client . list_resources (provider = "gcp" )
bucket = await client . get_resource ( "gcp" , "storage" , "my-bucket" )
result = await client . apply_resource ({
"provider" : "gcp" ,
"resource" : "storage" ,
"name" : "new-bucket" ,
"config" : { "location" : "EU" },
})
asyncio . run ( main ())
Async Context Manager
async with AsyncPragmaClient () as client :
resources = await client . list_resources ()
# Connection automatically closed
# Or manually close:
client = AsyncPragmaClient ()
try :
resources = await client . list_resources ()
finally :
await client . close ()
Streaming Build Logs (Async)
async with client . stream_build_logs ( "my-provider" , "20250115.120000" ) as response :
async for line in response . aiter_lines ():
print (line)
Concurrent Operations
Run multiple independent operations concurrently:
import asyncio
async def fetch_all_resources ():
async with AsyncPragmaClient () as client :
# Fetch from multiple providers concurrently
gcp , aws , azure = await asyncio . gather (
client. list_resources (provider = "gcp" ),
client. list_resources (provider = "aws" ),
client. list_resources (provider = "azure" ),
)
return gcp + aws + azure
Typed Responses
Use typed models for better IDE support and validation. Define models using the SDK base classes.
Resource Model
from pragma_sdk import Resource , Config , Outputs , LifecycleState
class BucketConfig ( Config ):
location : str
storage_class : str = "STANDARD"
class BucketOutputs ( Outputs ):
url : str
created_at : str
class Bucket (Resource [ BucketConfig , BucketOutputs ] ):
provider = "gcp"
resource = "storage"
Using Typed Models
# Get with typed response
bucket = client . get_resource ( "gcp" , "storage" , "my-bucket" , model = Bucket)
print (bucket.config.location) # "EU"
print (bucket.outputs.url) # "gs://my-bucket"
print (bucket.lifecycle_state) # LifecycleState.READY
# List with typed response
buckets = client . list_resources (provider = "gcp" , resource = "storage" , model = Bucket)
for bucket in buckets :
print ( f " { bucket.name } : { bucket.config.location } " )
# Apply typed resource
new_bucket = Bucket (
name = "new-bucket" ,
config = BucketConfig (location = "US" ),
)
result = client . apply_resource (new_bucket, model = Bucket)
Lifecycle State Enum
from pragma_sdk import LifecycleState
resource = client . get_resource ( "gcp" , "storage" , "my-bucket" )
match resource [ "lifecycle_state" ]:
case LifecycleState . DRAFT :
print ( "Not yet submitted" )
case LifecycleState . PENDING :
print ( "Queued for processing" )
case LifecycleState . PROCESSING :
print ( "Being created/updated/deleted" )
case LifecycleState . READY :
print ( "Successfully provisioned" )
case LifecycleState . FAILED :
print ( f "Error: { resource[ 'error' ] } " )
Dependencies
When building providers, you can declare dependencies on other resources using the Dependency[T] generic. This provides typed access to the resolved dependency’s outputs.
from pragma_sdk import Resource , Config , Outputs , Dependency
from my_provider . resources import Database
class AppConfig ( Config ):
# dependency: declares a dependency on a Database resource
database : Dependency [ Database ]
@mycloud . resource ( "app" )
class App (Resource [ AppConfig , AppOutputs ] ):
async def on_create ( self ) -> AppOutputs:
# Access the resolved dependency
db = self . config . database
# Access outputs (typed)
conn_url = db . outputs . connection_url
# Access config
db_size = db . config . size_gb
return AppOutputs (url = f "Connected to { conn_url } " )
Error Handling
The SDK raises httpx.HTTPStatusError for API errors. Handle common cases:
import httpx
from pragma_sdk import PragmaClient
with PragmaClient () as client :
try :
resource = client . get_resource ( "gcp" , "storage" , "nonexistent" )
except httpx . HTTPStatusError as e :
if e . response . status_code == 404 :
print ( "Resource not found" )
elif e . response . status_code == 401 :
print ( "Authentication failed" )
elif e . response . status_code == 403 :
print ( "Permission denied" )
else :
print ( f "API error: { e.response.status_code } " )
raise
Authentication Errors
from pragma_sdk import PragmaClient
try :
# Require authentication
client = PragmaClient (require_auth = True )
except ValueError as e :
print ( f "No credentials found: { e } " )
print ( "Run 'pragma auth login' or set PRAGMA_AUTH_TOKEN" )
Timeout Handling
import httpx
with PragmaClient (timeout = 30.0 ) as client :
try :
result = client . apply_resource (large_resource)
except httpx . TimeoutException :
print ( "Request timed out - try increasing timeout" )
Retry Pattern
import time
import httpx
def apply_with_retry ( client , resource , max_retries = 3 ):
for attempt in range (max_retries):
try :
return client . apply_resource (resource)
except httpx . HTTPStatusError as e :
if e . response . status_code >= 500 and attempt < max_retries - 1 :
time . sleep ( 2 ** attempt) # Exponential backoff
continue
raise
File Uploads
Upload files to Pragmatiks storage programmatically:
# Upload a file
with open ( "training-data.jsonl" , "rb" ) as f :
result = client . upload_file (
name = "training-data" ,
content = f. read (),
content_type = "application/jsonl" ,
)
print (result[ "url" ]) # Storage URL
print (result[ "public_url" ]) # Public download URL
print (result[ "size" ]) # File size in bytes
print (result[ "checksum" ]) # Content checksum
After uploading, create a pragma/file resource to track the file:
client . apply_resource ({
"provider" : "pragma" ,
"resource" : "file" ,
"name" : "training-data" ,
"lifecycle_state" : "pending" ,
"config" : {
"content_type" : "application/jsonl" ,
},
})
Dependencies and Field References
FieldReference
Reference a specific output field from another resource:
from pragma_sdk import FieldReference
# In a resource config, a FieldReference points to another resource's output
resource = {
"provider" : "agno" ,
"resource" : "anthropic-model" ,
"name" : "claude" ,
"lifecycle_state" : "pending" ,
"config" : {
"model_id" : "claude-sonnet-4-5-20250929" ,
"api_key" : {
"provider" : "pragma" ,
"resource" : "secret" ,
"name" : "anthropic-key" ,
"field" : "outputs.api_key" ,
},
},
}
client . apply_resource (resource)
Dependency[T]
For provider authors, declare whole-resource dependencies with typed access:
from pragma_sdk import Resource , Config , Outputs , Dependency
class ModelConfig ( Config ):
model_id : str
api_key : str
class ModelOutputs ( Outputs ):
model_name : str
class Model (Resource [ ModelConfig , ModelOutputs ] ):
provider = "agno"
resource = "anthropic-model"
class AgentConfig ( Config ):
model : Dependency [ Model ]
instructions : str
class AgentOutputs ( Outputs ):
agent_id : str
@mycloud . resource ( "agent" )
class Agent (Resource [ AgentConfig , AgentOutputs ] ):
async def on_create ( self ) -> AgentOutputs:
model = await self . config . model . resolve ()
# Access typed config and outputs
model_name = model . outputs . model_name
return AgentOutputs (agent_id = f "agent-using- { model_name } " )
Owner References
Establish parent-child relationships for cascade deletion:
result = client . apply_resource ({
"provider" : "agno" ,
"resource" : "agent" ,
"name" : "team-member" ,
"lifecycle_state" : "pending" ,
"config" : { "instructions" : "You are a team member." },
"owner_references" : [
{
"provider" : "agno" ,
"resource" : "team" ,
"name" : "my-team" ,
}
],
})
When the owner resource (my-team) is deleted, owned resources (team-member) are automatically cascade-deleted.
Provider Authoring
The SDK also provides utilities for building providers. See Building Providers for complete documentation.
Key Exports
from pragma_sdk import (
# Base classes for resources
Resource ,
Config ,
Outputs ,
# Provider decorator
Provider ,
# Enums and types
LifecycleState ,
EventType ,
ResponseStatus ,
# Field references for dependencies
Field ,
FieldReference ,
ResourceReference ,
)
from pragma_sdk . provider import (
# Testing harness
ProviderHarness ,
LifecycleEvent ,
LifecycleResult ,
)
Quick Example
from pragma_sdk import Provider , Resource , Config , Outputs
mycloud = Provider (name = "mycloud" )
class DatabaseConfig ( Config ):
name : str
size_gb : int = 10
class DatabaseOutputs ( Outputs ):
connection_url : str
@mycloud . resource ( "database" )
class Database (Resource [ DatabaseConfig , DatabaseOutputs ] ):
async def on_create ( self ) -> DatabaseOutputs:
# Provision database
return DatabaseOutputs (connection_url = f "postgres://.../ { self.config.name } " )
async def on_update ( self , previous_config : DatabaseConfig) -> DatabaseOutputs:
# Handle updates
return self . outputs
async def on_delete ( self ) -> None :
# Clean up database
pass
Next Steps
Building Providers Create custom providers with lifecycle methods.
Lifecycle Methods Deep dive into on_create, on_update, and on_delete.
API Reference Full REST API documentation.
Error Recovery Handle failures and retry dead letter events.