Skip to main content

Python

Python SDK

Use this SDK when your discovery workflow lives in notebooks, scripts, or Python services. It wraps CDE's discovery surfaces with typed models, retries, and polling helpers so you can focus on the workflow instead of request plumbing.

CDEClient (sync)

Blocking client for notebooks, scripts, and batch jobs—explicit methods for every resource with automatic pagination, polling helpers, and consistent error types.

AsyncCDEClient

Async-first interface for services and high-concurrency pipelines—compose concurrent runs, streaming artifacts, and cancellation that respects server-side state.

Typed models

Requests and responses map to Pydantic models generated from the OpenAPI spec—IDE autocompletion and type checkers catch mistakes before they reach production.

Retry and backoff

Automatic retries with exponential backoff for transient failures (429, 5xx), aligned with platform rate headers. Non-retryable errors raise immediately.

Installation

The SDK requires Python 3.9+ and is distributed on PyPI. Install with pip or any PEP 517-compatible installer.

# Core SDK
pip install cde-sdk

# With async support (adds httpx[http2])
pip install cde-sdk[async]

# With notebook extras (adds rich display + pandas)
pip install cde-sdk[notebook]

# Everything
pip install cde-sdk[async,notebook]

Python: 3.9, 3.10, 3.11, 3.12, 3.13

Dependencies: httpx, pydantic ≥ 2.0, typing-extensions

Optional: pandas, matplotlib (notebook extras)

Python SDK architecture showing the client, models, and transport layers

Quick start

From pip install to your first claim in 10 lines. Set the CDE_API_KEY environment variable or pass it directly to the client.

from cde import CDEClient

client = CDEClient(api_key="cde_key_sk_live_7f3a9b2c...")

project = client.projects.create(name="My First Discovery")
dataset = client.datasets.upload(project_id=project.id, file_path="data.parquet")
campaign = client.campaigns.create(project_id=project.id, name="Initial sweep")
run = client.runs.create(project_id=project.id, campaign_id=campaign.id, mode="symbolic", dataset_id=dataset.id)
run = client.runs.wait(run.id, poll_interval=5.0)

for claim in client.claims.list(project_id=project.id, run_id=run.id):
    print(f"[{claim.type}] {claim.summary} (score: {claim.score})")

Client configuration

Both CDEClient and AsyncCDEClient accept the same configuration options. Environment variables are used as defaults — explicit arguments take precedence.

from cde import CDEClient

client = CDEClient(
    api_key="cde_key_sk_live_7f3a9b2c...",     # or CDE_API_KEY env var
    base_url="https://api.cde.vareon.com/v1",   # or CDE_BASE_URL env var
    timeout=60.0,                                 # request timeout in seconds (default: 30)
    max_retries=5,                                # retries for transient failures (default: 3)
    headers={"X-Custom-Header": "value"},         # additional headers on every request
)

api_key

Your CDE API key. Falls back to CDE_API_KEY.

base_url

API base URL. Defaults to https://api.cde.vareon.com/v1.

timeout

Per-request timeout in seconds. Default: 30. Dataset uploads use 5x this value automatically.

max_retries

Number of retries for 429 and 5xx responses with exponential backoff + jitter. Default: 3.

headers

Additional HTTP headers merged into every request. Useful for tracing or custom metadata.

Environment

CDE_API_KEY, CDE_BASE_URL, CDE_TIMEOUT, CDE_MAX_RETRIES.

Full API surface

client.projects.*

# List all projects (returns paginated iterator)
projects = client.projects.list(status="active")
for project in projects:
    print(f"{project.id}: {project.name}")

# Create a new project with governance settings
project = client.projects.create(
    name="Fluid Dynamics Study",
    description="Discover governing equations for turbulent flow",
    settings={
        "default_mode": "neuro_symbolic",
        "governance": {
            "promotion_ceiling": "validate",
            "require_negative_controls": True,
            "truth_dial_threshold": 0.85
        }
    }
)

# Retrieve a specific project
project = client.projects.get("proj_tds_8f2a1b")

# Update project settings
project = client.projects.update(
    "proj_tds_8f2a1b",
    description="Updated description",
    settings={"governance": {"truth_dial_threshold": 0.90}}
)

client.datasets.*

# Upload a dataset from a local file (parquet, CSV, HDF5)
dataset = client.datasets.upload(
    project_id="proj_tds_8f2a1b",
    file_path="turbulence_re5000.parquet",
    name="Turbulence Re=5000",
    variables=["velocity_x", "velocity_y", "pressure", "time"]
)
print(f"Uploaded: {dataset.id} ({dataset.size_bytes} bytes, {dataset.rows} rows)")

# Get automated profile (statistics, quality, distributions)
profile = client.datasets.profile(
    project_id="proj_tds_8f2a1b",
    dataset_id="ds_turb_001"
)
for col in profile.columns:
    print(f"  {col.name}: mean={col.mean:.3f}, std={col.std:.3f}, nulls={col.null_count}")

# List datasets in a project
datasets = client.datasets.list(project_id="proj_tds_8f2a1b", status="ready")

client.campaigns.*

# Create a campaign with budget and governance
campaign = client.campaigns.create(
    project_id="proj_tds_8f2a1b",
    name="Reynolds Number Sweep",
    budget={"compute_units": 2500, "max_runs": 50},
    governance={
        "promotion_ceiling": "validate",
        "require_negative_controls": True,
        "auto_promote_to": "explore"
    }
)

# List campaigns with filtering
campaigns = client.campaigns.list(
    project_id="proj_tds_8f2a1b",
    status="active"
)

# Get campaign details including budget usage
campaign = client.campaigns.get(
    project_id="proj_tds_8f2a1b",
    campaign_id="camp_rns_4e8f2a"
)
print(f"Budget: {campaign.budget.compute_units_used}/{campaign.budget.compute_units} CU")

client.runs.*

Submit discovery runs in any of CDE's four modes: symbolic, neural, neuro_symbolic, and causal.

# Symbolic discovery — find closed-form governing equations
run_sym = client.runs.create(
    project_id="proj_tds_8f2a1b",
    campaign_id="camp_rns_4e8f2a",
    mode="symbolic",
    dataset_id="ds_turb_001",
    parameters={"constraints": ["conservation_of_energy"], "max_complexity": 5}
)

# Neural discovery — learn latent representations
run_neural = client.runs.create(
    project_id="proj_tds_8f2a1b",
    campaign_id="camp_rns_4e8f2a",
    mode="neural",
    dataset_id="ds_turb_001",
    parameters={"config": "default"}
)

# Neuro-Symbolic — combine neural embeddings with symbolic regression
run_ns = client.runs.create(
    project_id="proj_tds_8f2a1b",
    campaign_id="camp_rns_4e8f2a",
    mode="neuro_symbolic",
    dataset_id="ds_turb_001",
    parameters={
        "config": "default",
        "max_complexity": 5,
        "cross_validate": True
    }
)

# Causal mode — causal dynamics discovery
run_cde = client.runs.create(
    project_id="proj_tds_8f2a1b",
    campaign_id="camp_rns_4e8f2a",
    mode="causal",
    dataset_id="ds_turb_001",
    parameters={"config": "default"}
)

# Wait for a run to complete (blocks with polling)
completed = client.runs.wait(run_sym.id, poll_interval=5.0, timeout=3600.0)
print(f"Status: {completed.status}, Claims: {completed.claims_count}")

# Check status without blocking
status = client.runs.status(run_ns.id)
print(f"Stage: {status.pipeline.current_stage}, Progress: {status.pipeline.progress}")

# Cancel a queued or running run
client.runs.cancel(run_neural.id)

# List runs in a campaign
runs = client.runs.list(
    project_id="proj_tds_8f2a1b",
    campaign_id="camp_rns_4e8f2a",
    status="completed",
    mode="symbolic"
)

client.claims.*

# List claims with filters
claims = client.claims.list(
    project_id="proj_tds_8f2a1b",
    run_id="run_ns_7a3b2c",
    type="law",                  # "law", "causal", "conservation"
    tier="explore",              # "explore", "validate", "publish"
    min_score=0.8,
    sort="score",
    order="desc"
)

for claim in claims:
    print(f"[{claim.type}] {claim.summary}")
    print(f"  Score: {claim.score}, Tier: {claim.tier}")
    print(f"  Scope: {claim.scope}")

# Get a specific claim with full detail
claim = client.claims.get(project_id="proj_tds_8f2a1b", claim_id="clm_law_9d4e5f")
print(f"Expression: {claim.expression}")
print(f"Evidence: r²={claim.evidence.r_squared}, controls={claim.evidence.negative_controls_passed}")

# Promote a claim through Truth Dial tiers
promoted = client.claims.promote(
    project_id="proj_tds_8f2a1b",
    claim_id="clm_law_9d4e5f",
    target_tier="validate",
    rationale="Passed all negative controls and cross-dataset validation."
)
print(f"Promoted to {promoted.tier} (from {promoted.previous_tier})")

# Export claims to JSON or CSV
client.claims.export(
    project_id="proj_tds_8f2a1b",
    run_id="run_ns_7a3b2c",
    format="csv",
    output_path="claims_export.csv"
)

client.ledger.*

# Query ledger entries for a specific claim
entries = client.ledger.query(
    project_id="proj_tds_8f2a1b",
    claim_id="clm_law_9d4e5f",
    event_type="claim_promoted"
)
for entry in entries:
    print(f"[{entry.timestamp}] {entry.event_type} by {entry.actor}")
    print(f"  Details: {entry.details}")

# Verify ledger integrity (chain validation)
verification = client.ledger.verify(project_id="proj_tds_8f2a1b")
print(f"Integrity: {verification.status}")  # "valid" or "corrupted"
print(f"Entries verified: {verification.entries_checked}")

# Export full ledger for external auditing
client.ledger.export(
    project_id="proj_tds_8f2a1b",
    format="json",
    output_path="ledger_audit.json"
)

client.artifacts.*

# List artifacts produced by a run
artifacts = client.artifacts.list(
    project_id="proj_tds_8f2a1b",
    run_id="run_ns_7a3b2c"
)
for artifact in artifacts:
    print(f"{artifact.name} ({artifact.type}, {artifact.size_bytes} bytes)")

# Download an artifact to disk
client.artifacts.download(
    project_id="proj_tds_8f2a1b",
    artifact_id="art_model_weights_01",
    output_path="model_weights.pt"
)

# Stream large artifacts without loading into memory
with client.artifacts.stream(
    project_id="proj_tds_8f2a1b",
    artifact_id="art_embeddings_01"
) as stream:
    for chunk in stream:
        process(chunk)

AsyncCDEClient

The async client shares the same method signatures but returns awaitables. Built on httpx async transport, it integrates cleanly with asyncio for FastAPI services, Celery workers, and concurrent pipelines.

from cde import AsyncCDEClient
import asyncio

async def parallel_discovery():
    async with AsyncCDEClient() as client:
        # Launch all four modes concurrently
        runs = await asyncio.gather(
            client.runs.create(
                project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
                mode="symbolic", dataset_id="ds_turb_001",
                parameters={"constraints": ["conservation_of_energy"], "max_complexity": 5}
            ),
            client.runs.create(
                project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
                mode="neural", dataset_id="ds_turb_001",
                parameters={"config": "default"}
            ),
            client.runs.create(
                project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
                mode="neuro_symbolic", dataset_id="ds_turb_001",
                parameters={"config": "default"}
            ),
            client.runs.create(
                project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
                mode="causal", dataset_id="ds_turb_001",
                parameters={"config": "default"}
            ),
        )

        # Wait for all to complete
        completed = await asyncio.gather(*[client.runs.wait(r.id) for r in runs])

        # Collect and compare claims across modes
        for run in completed:
            claims = await client.claims.list(project_id="proj_tds_8f2a1b", run_id=run.id)
            async for claim in claims:
                print(f"[{run.mode}] {claim.summary} (score: {claim.score})")

        # Stream a large artifact
        async with client.artifacts.stream(
            project_id="proj_tds_8f2a1b", artifact_id="art_embeddings_01"
        ) as stream:
            async for chunk in stream:
                await process(chunk)

asyncio.run(parallel_discovery())

The async with context manager ensures the underlying HTTP connection pool is closed when your workflow completes, even if exceptions occur. You can also instantiate the client without a context manager and call await client.close() manually.

Error handling

All exceptions inherit from CDEError. The SDK retries transient failures (429, 5xx) automatically—non-retryable errors raise immediately with the structured error body from the API.

Exception hierarchy

CDEError

├── AuthenticationError (401)

├── PermissionError (403)

├── PolicyViolationError (403, governance)

├── NotFoundError (404)

├── ValidationError (400, 422)

├── ConflictError (409)

├── RateLimitError (429)

├── BudgetExhaustedError (budget depleted)

├── TimeoutError (request or poll timeout)

└── ServerError (500, 503)

Exception attributes

.status_code — HTTP status code from the API

.error_code — Machine-readable error code string

.message — Human-readable error message

.details — Dict with error-specific metadata

.request_id — Request ID for support tickets

.constraint — (PolicyViolationError) the violated constraint

.retry_after — (RateLimitError) seconds to wait

from cde import CDEClient
from cde.exceptions import (
    CDEError, PolicyViolationError, NotFoundError,
    RateLimitError, BudgetExhaustedError, ValidationError
)

client = CDEClient(max_retries=3, timeout=30.0)

try:
    promoted = client.claims.promote(
        project_id="proj_tds_8f2a1b",
        claim_id="clm_law_9d4e5f",
        target_tier="publish",
        rationale="Passed all negative controls and cross-dataset validation."
    )
    print(f"Promoted to {promoted.tier}")

except PolicyViolationError as e:
    print(f"Governance blocked promotion: {e.constraint}")
    print(f"Policy ceiling: {e.details['policy_ceiling']}")
    print(f"Truth Dial score: {e.details['truth_dial_score']}")

except NotFoundError:
    print("Claim not found — it may have been archived.")

except BudgetExhaustedError as e:
    print(f"Campaign budget depleted: {e.details['compute_units_used']}/{e.details['compute_units']}")

except ValidationError as e:
    print(f"Invalid request: {e.message}")
    for field_error in e.details.get("field_errors", []):
        print(f"  {field_error['field']}: {field_error['message']}")

except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")

except CDEError as e:
    print(f"Unexpected error [{e.status_code}]: {e.message}")
    print(f"Request ID for support: {e.request_id}")

Notebook integration

The SDK is built for Jupyter. The synchronous client avoids event loop conflicts in notebook kernels. Model objects implement _repr_html_ for rich rendering, and list results convert to pandas DataFrames with a single call.

from cde import CDEClient

client = CDEClient()

# Rich display — just evaluate the object in a cell
claim = client.claims.get(project_id="proj_tds_8f2a1b", claim_id="clm_law_9d4e5f")
claim  # renders as a formatted card with type, scope, score, provenance

# Convert claims to a pandas DataFrame
claims = client.claims.list(project_id="proj_tds_8f2a1b", run_id="run_ns_7a3b2c")
df = claims.to_dataframe()
df.groupby("type")["score"].describe()

# Inline plotting of run progress
status = client.runs.status("run_ns_7a3b2c")
status.plot()  # renders a pipeline progress bar in the cell

# Export claims for downstream analysis
df.to_csv("claims_analysis.csv", index=False)
Notebook integration showing rich claim display and DataFrame output

Testing and mocking

The SDK provides a MockCDEClient for unit tests. It records calls, returns configurable responses, and validates request shapes against the same Pydantic models used in production.

from cde.testing import MockCDEClient, mock_claim, mock_run
import pytest

@pytest.fixture
def cde_client():
    client = MockCDEClient()

    client.runs.mock_create(return_value=mock_run(
        id="run_test_001", mode="symbolic", status="completed"
    ))
    client.runs.mock_wait(return_value=mock_run(
        id="run_test_001", mode="symbolic", status="completed", claims_count=2
    ))
    client.claims.mock_list(return_value=[
        mock_claim(id="clm_001", type="law", summary="F = ma", score=0.95),
        mock_claim(id="clm_002", type="causal", summary="Force causes acceleration", score=0.88),
    ])

    return client

def test_discovery_pipeline(cde_client):
    run = cde_client.runs.create(
        project_id="proj_test", campaign_id="camp_test",
        mode="symbolic", dataset_id="ds_test"
    )
    assert run.status == "completed"

    claims = list(cde_client.claims.list(project_id="proj_test", run_id=run.id))
    assert len(claims) == 2
    assert claims[0].type == "law"
    assert claims[0].score > 0.9

    # Verify the client recorded the right calls
    assert cde_client.runs.create_called_with(mode="symbolic")
    assert cde_client.claims.list_called_with(run_id="run_test_001")

Type system

Every claim type carries domain-specific fields. Models are generated from CDE's OpenAPI spec and validated with Pydantic. Type checkers (mypy, pyright) catch structural errors at development time.

LawClaim

.id: str

.type: "law"

.summary: str

.expression: str

.score: float

.tier: Tier

.scope: Scope

.evidence: Evidence

.ledger_entry_id: str

.run_id: str

.created_at: datetime

CausalClaim

.id: str

.type: "causal"

.summary: str

.cause: str

.effect: str

.direction: str

.temporal_scope: str

.evidence_markers: list[str]

.score: float

.tier: Tier

.scope: Scope

.evidence: CausalEvidence

ConservationClaim

.id: str

.type: "conservation"

.summary: str

.conserved_quantity: str

.system_boundaries: list[str]

.expression: str

.confidence: float

.score: float

.tier: Tier

.scope: Scope

.evidence: ConservationEvidence

All claim types include Scope (variables, domain, constraints satisfied), Evidence (statistical measures, negative control results), and governance metadata (ledger references, Truth Dial scores). Models support both attribute access and dict-style access.

from cde.types import LawClaim, CausalClaim, ConservationClaim

claim = client.claims.get(project_id="proj_tds_8f2a1b", claim_id="clm_law_9d4e5f")

# Type narrowing
if isinstance(claim, LawClaim):
    print(f"Governing equation: {claim.expression}")
    print(f"R²: {claim.evidence.r_squared}")
elif isinstance(claim, CausalClaim):
    print(f"Cause: {claim.cause} → Effect: {claim.effect}")
    print(f"Causal confidence: {claim.evidence.causal_confidence}")
elif isinstance(claim, ConservationClaim):
    print(f"Conserved: {claim.conserved_quantity}")
    print(f"Boundaries: {claim.system_boundaries}")

# All claims share these fields
print(f"Score: {claim.score}, Tier: {claim.tier}")
print(f"Ledger: {claim.ledger_entry_id}")
print(f"Scope: {claim.scope.domain}")

The SDK wraps the same resources available through the REST API and the MCP server. Use the SDK for scheduled jobs, CI validation, and services that coordinate many runs programmatically.

All three interfaces share the same authentication, resource model, and governance stack.