absurd_client package
Python client for the Absurd SQL-based durable execution workflow system.
This module provides a client interface to interact with the Absurd workflow engine, which is built on PostgreSQL. It allows you to spawn tasks, claim and process them, handle events, manage checkpoints, and track workflow runs.
- class absurd_client.AbsurdClient(queue_name: str | None = None, worker_id: str | None = None)[source]
Bases:
objectEnhanced client for interacting with Absurd’s SQL functions with full feature utilization.
- await_event(conn: Connection, task_id: UUID, run_id: UUID, step_name: str, event_name: str, timeout: int | None = None) tuple[bool, dict[str, Any] | None][source]
Wait for an event with timeout support.
- Parameters:
conn – Database session
task_id – Task ID
run_id – Run ID
step_name – Step name
event_name – Event name to wait for
timeout – Timeout in seconds (None for no timeout)
Returns: (should_suspend, payload)
- cancel_task(conn: Connection, run_id: UUID) bool[source]
Manually cancel a pending or sleeping task.
- Parameters:
conn – Database connection
run_id – Task run ID to cancel
- Returns:
True if task was cancelled, False if it couldn’t be cancelled (already running, completed, or failed)
- Raises:
Exception – If task not found or database error
- claim_task(conn: Connection, worker_id: str | None = None, claim_timeout: int = 30, qty: int = 1) list[tuple[Any, ...]][source]
Claim tasks from the Absurd queue with advanced features.
- Parameters:
conn – Database session
worker_id – Worker identifier (defaults to instance worker_id)
claim_timeout – Claim timeout in seconds (0 for no timeout)
qty – Number of tasks to claim in batch (for high-throughput processing)
- Returns: List of (run_id, task_id, attempt, task_name, params, retry_strategy,
max_attempts, headers, wake_event, event_payload)
- cleanup_events(conn: Connection, ttl_seconds: int, limit: int = 1000) int[source]
Clean up old events.
- cleanup_tasks(conn: Connection, ttl_seconds: int, limit: int = 1000) int[source]
Clean up old completed tasks.
- complete_task(conn: Connection, run_id: UUID, result: dict[str, Any] | None = None) None[source]
Mark a task as completed with state validation support.
- create_workflow_run(conn: Connection, workflow_name: str, workflow_version: str, inputs: dict[str, Any] | None = None, absurd_run_id: UUID | None = None, created_by: str | None = None, tags: dict[str, Any] | None = None, workflow_hash: str | None = None) UUID[source]
Create a new workflow_run record to track workflow execution.
- Parameters:
conn – Database connection (transaction-aware)
workflow_name – Logical workflow name (must match
^[a-z][a-z0-9_]*$, no ‘__’)workflow_version – Workflow version (must match
^[a-zA-Z0-9._-]+$, no ‘__’)inputs – Workflow input parameters
absurd_run_id – Optional root Absurd run_id
created_by – Optional user/system identifier
tags – Optional key-value tags for filtering
workflow_hash – Optional SHA-256 hash of workflow definition
- Returns:
UUID of the created workflow_run record
- Return type:
workflow_run_id
- emit_event(conn: Connection, event_name: str, payload: dict[str, Any] | None = None) None[source]
Emit an event and wake any runs waiting for it.
This implements the Absurd waker pattern: 1. Insert event into events table 2. Find all runs waiting for this event (from wait_registrations) 3. Wake them up by marking as available with event payload 4. Delete wait registrations (fulfilled)
- extend_claim(conn: Connection, run_id: UUID, extend_by_seconds: int) None[source]
Extend the claim timeout for a long-running task.
This is crucial for tasks that take longer than the initial claim timeout.
- fail_task(conn: Connection, run_id: UUID, reason: str | dict[str, Any], retry_at: datetime | None = None) None[source]
Mark a task as failed with detailed error information.
- Parameters:
conn – Database session
run_id – Task run ID
reason – Failure reason (string or detailed dict)
retry_at – Optional retry timestamp (for manual retry scheduling)
- get_all_checkpoints(conn: Connection, task_id: UUID, run_id: UUID) list[dict[str, Any]][source]
Get all checkpoints for a task.
- get_checkpoint(conn: Connection, task_id: UUID, step_name: str, include_pending: bool = False) dict[str, Any] | None[source]
Get a checkpoint for a task.
- get_checkpoints_for_run(conn: Connection, run_id: UUID) dict[str, Any][source]
Get all checkpoints for a specific run.
LOW PRIORITY FIX (Issue #18): Direct table query with locking. While Absurd provides absurd.get_task_checkpoint_states(), it requires a task_id. This method is called with only run_id (from DurableContext initialization), so we must query the checkpoints table directly by owner_run_id. Using FOR SHARE lock to ensure we read committed data.
Note: This is a read-only operation with low impact on data consistency.
- get_run_checkpoint(conn: Connection, run_id: UUID, step_name: str) Any[source]
Get a specific checkpoint for a run.
- get_run_status(conn: Connection, run_id: UUID) dict[str, Any] | None[source]
Get detailed run status information.
LOW PRIORITY FIX (Issue #17): Direct table query with locking. Absurd does not provide a stored procedure for retrieving run status, so we must query the runs table directly. Using FOR SHARE lock to ensure we read committed data and prevent phantom reads.
Note: This is a read-only operation with low impact on data consistency.
- get_task_status(conn: Connection, task_id: UUID) dict[str, Any] | None[source]
Get detailed task status information.
LOW PRIORITY FIX (Issue #16): Direct table query with locking. Absurd does not provide a stored procedure for retrieving task status, so we must query the tasks table directly. Using FOR SHARE lock to ensure we read committed data and prevent phantom reads.
Note: This is a read-only operation with low impact on data consistency.
- save_checkpoint_for_run(conn: Connection, run_id: UUID, step_name: str, data: Any, task_id: UUID | None = None) None[source]
Set a checkpoint for a specific step (convenience wrapper for DurableContext).
- schedule_task(conn: Connection, run_id: UUID, wake_at: datetime) None[source]
Schedule a task to run at a specific time.
Useful for delayed execution or rate limiting.
- set_checkpoint(conn: Connection, task_id: UUID, step_name: str, state: dict[str, Any], owner_run: UUID, extend_claim_by: int | None = None) None[source]
Set a checkpoint for a task with optional claim extension.
- Parameters:
conn – Database session
task_id – Task ID
step_name – Checkpoint step name
state – Checkpoint state data
owner_run – Run ID that owns this checkpoint
extend_claim_by – Optional claim extension in seconds
- set_run_sleeping(conn: Connection, run_id: UUID, event_name: str) None[source]
Mark a run as SLEEPING waiting for an event.
Called by orchestrator after catching AbsurdSleepException.
- sleep(conn: Connection, run_id: UUID, duration_seconds: int) None[source]
Durable sleep that survives crashes and restarts.
- spawn_task(conn: Connection, task_name: str, params: dict[str, Any], options: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, retry_strategy: dict[str, Any] | None = None, max_attempts: int | None = None, cancellation: dict[str, Any] | None = None, workflow_run_id: UUID | None = None) tuple[UUID, UUID, UUID][source]
Spawn a new task in the Absurd queue with full feature support.
- Parameters:
conn – Database connection (psycopg3)
task_name – Name of the task
params – Task parameters
options – Legacy options dict (for backward compatibility)
headers – Task headers for metadata
retry_strategy – Retry strategy configuration
max_attempts – Maximum retry attempts
cancellation – Cancellation rules (max_delay, max_duration)
workflow_run_id – Optional workflow run ID for tracking
Returns: (task_id, run_id, workflow_run_id)
- update_workflow_run_status(conn: Connection, workflow_run_id: UUID, status: str, result: dict[str, Any] | None = None, error: dict[str, Any] | None = None, started_at: datetime | None = None, completed_at: datetime | None = None, task_count: int | None = None) None[source]
Update workflow_run status and metadata.
- Parameters:
conn – Database connection
workflow_run_id – UUID of workflow_run to update
status – New status (pending, running, completed, failed, cancelled)
result – Optional final result
error – Optional error details
started_at – Optional start timestamp
completed_at – Optional completion timestamp
task_count – Optional task count
- wait_for_event(conn: Connection, run_id: UUID, event_name: str, timeout_seconds: int | None = None, task_id: UUID | None = None, step_name: str | None = None) Any[source]
Wait for an event using Absurd’s sleep/wake mechanism.
This implements the proper Absurd pattern: 1. Check if event already exists (fast path - return immediately) 2. Check if this run has been woken with event payload 3. If not, register wait and mark run as SLEEPING 4. Orchestrator will free worker thread and process other tasks 5. When event is emitted, run is woken and will resume here
- Parameters:
conn – Database session
run_id – The Absurd run ID (NOT workflow run ID!)
event_name – Name of event to wait for
timeout_seconds – Optional timeout (default 24 hours)
task_id – The Absurd task ID (required for registering wait)
step_name – The step name (required for registering wait)
- Returns:
Event payload when event is received
- Raises:
TimeoutError – If timeout expires before event received
- exception absurd_client.AbsurdSleepError(message: str, run_id: UUID | None = None, event_name: str | None = None)[source]
Bases:
ExceptionRaised when a run enters SLEEPING state to wait for an event.
This signals to the orchestrator that the worker thread should be freed to process other tasks while this run waits.
- absurd_client.get_absurd_client(queue_name: str | None = None, worker_id: str | None = None) AbsurdClient[source]
Get the singleton AbsurdClient instance.
- Parameters:
queue_name – Optional queue name (only used on first call)
worker_id – Optional worker ID (only used on first call)
- Returns:
Shared AbsurdClient instance
- absurd_client.spawn_cancellable_task(client: AbsurdClient, conn: Connection, task_name: str, params: dict[str, Any], max_delay_seconds: int | None = None, max_duration_seconds: int | None = None) tuple[UUID, UUID, UUID][source]
Spawn a task with cancellation rules.
- absurd_client.spawn_retry_task(client: AbsurdClient, conn: Connection, task_name: str, params: dict[str, Any], max_attempts: int = 3, retry_kind: str = 'exponential', base_seconds: int = 30, factor: float = 2.0, max_seconds: int | None = None) tuple[UUID, UUID, UUID][source]
Spawn a task with retry strategy.