API Reference

This section provides detailed documentation for all classes, methods, and functions in the Absurd client.

AbsurdClient Class

class absurd_client.AbsurdClient(queue_name: str | None = None, worker_id: str | None = None)[source]

Bases: object

Enhanced client for interacting with Absurd’s SQL functions with full feature utilization.

__init__(queue_name: str | None = None, worker_id: str | None = None)[source]
await_event(conn: Connection, task_id: UUID, run_id: UUID, step_name: str, event_name: str, timeout: int | None = None) tuple[bool, dict[str, Any] | None][source]

Wait for an event with timeout support.

Parameters:
  • conn – Database session

  • task_id – Task ID

  • run_id – Run ID

  • step_name – Step name

  • event_name – Event name to wait for

  • timeout – Timeout in seconds (None for no timeout)

Returns: (should_suspend, payload)

cancel_task(conn: Connection, run_id: UUID) bool[source]

Manually cancel a pending or sleeping task.

Parameters:
  • conn – Database connection

  • run_id – Task run ID to cancel

Returns:

True if task was cancelled, False if it couldn’t be cancelled (already running, completed, or failed)

Raises:

Exception – If task not found or database error

claim_task(conn: Connection, worker_id: str | None = None, claim_timeout: int = 30, qty: int = 1) list[tuple[Any, ...]][source]

Claim tasks from the Absurd queue with advanced features.

Parameters:
  • conn – Database session

  • worker_id – Worker identifier (defaults to instance worker_id)

  • claim_timeout – Claim timeout in seconds (0 for no timeout)

  • qty – Number of tasks to claim in batch (for high-throughput processing)

Returns: List of (run_id, task_id, attempt, task_name, params, retry_strategy,

max_attempts, headers, wake_event, event_payload)

cleanup_events(conn: Connection, ttl_seconds: int, limit: int = 1000) int[source]

Clean up old events.

cleanup_tasks(conn: Connection, ttl_seconds: int, limit: int = 1000) int[source]

Clean up old completed tasks.

complete_task(conn: Connection, run_id: UUID, result: dict[str, Any] | None = None) None[source]

Mark a task as completed with state validation support.

create_queue(conn: Connection) None[source]

Create the Absurd queue if it doesn’t exist.

create_workflow_run(conn: Connection, workflow_name: str, workflow_version: str, inputs: dict[str, Any] | None = None, absurd_run_id: UUID | None = None, created_by: str | None = None, tags: dict[str, Any] | None = None, workflow_hash: str | None = None) UUID[source]

Create a new workflow_run record to track workflow execution.

Parameters:
  • conn – Database connection (transaction-aware)

  • workflow_name – Logical workflow name (must match ^[a-z][a-z0-9_]*$, no ‘__’)

  • workflow_version – Workflow version (must match ^[a-zA-Z0-9._-]+$, no ‘__’)

  • inputs – Workflow input parameters

  • absurd_run_id – Optional root Absurd run_id

  • created_by – Optional user/system identifier

  • tags – Optional key-value tags for filtering

  • workflow_hash – Optional SHA-256 hash of workflow definition

Returns:

UUID of the created workflow_run record

Return type:

workflow_run_id

emit_event(conn: Connection, event_name: str, payload: dict[str, Any] | None = None) None[source]

Emit an event and wake any runs waiting for it.

This implements the Absurd waker pattern: 1. Insert event into events table 2. Find all runs waiting for this event (from wait_registrations) 3. Wake them up by marking as available with event payload 4. Delete wait registrations (fulfilled)

extend_claim(conn: Connection, run_id: UUID, extend_by_seconds: int) None[source]

Extend the claim timeout for a long-running task.

This is crucial for tasks that take longer than the initial claim timeout.

fail_task(conn: Connection, run_id: UUID, reason: str | dict[str, Any], retry_at: datetime | None = None) None[source]

Mark a task as failed with detailed error information.

Parameters:
  • conn – Database session

  • run_id – Task run ID

  • reason – Failure reason (string or detailed dict)

  • retry_at – Optional retry timestamp (for manual retry scheduling)

get_all_checkpoints(conn: Connection, task_id: UUID, run_id: UUID) list[dict[str, Any]][source]

Get all checkpoints for a task.

get_checkpoint(conn: Connection, task_id: UUID, step_name: str, include_pending: bool = False) dict[str, Any] | None[source]

Get a checkpoint for a task.

get_checkpoints_for_run(conn: Connection, run_id: UUID) dict[str, Any][source]

Get all checkpoints for a specific run.

LOW PRIORITY FIX (Issue #18): Direct table query with locking. While Absurd provides absurd.get_task_checkpoint_states(), it requires a task_id. This method is called with only run_id (from DurableContext initialization), so we must query the checkpoints table directly by owner_run_id. Using FOR SHARE lock to ensure we read committed data.

Note: This is a read-only operation with low impact on data consistency.

get_run_checkpoint(conn: Connection, run_id: UUID, step_name: str) Any[source]

Get a specific checkpoint for a run.

get_run_status(conn: Connection, run_id: UUID) dict[str, Any] | None[source]

Get detailed run status information.

LOW PRIORITY FIX (Issue #17): Direct table query with locking. Absurd does not provide a stored procedure for retrieving run status, so we must query the runs table directly. Using FOR SHARE lock to ensure we read committed data and prevent phantom reads.

Note: This is a read-only operation with low impact on data consistency.

get_task_status(conn: Connection, task_id: UUID) dict[str, Any] | None[source]

Get detailed task status information.

LOW PRIORITY FIX (Issue #16): Direct table query with locking. Absurd does not provide a stored procedure for retrieving task status, so we must query the tasks table directly. Using FOR SHARE lock to ensure we read committed data and prevent phantom reads.

Note: This is a read-only operation with low impact on data consistency.

save_checkpoint_for_run(conn: Connection, run_id: UUID, step_name: str, data: Any, task_id: UUID | None = None) None[source]

Set a checkpoint for a specific step (convenience wrapper for DurableContext).

schedule_task(conn: Connection, run_id: UUID, wake_at: datetime) None[source]

Schedule a task to run at a specific time.

Useful for delayed execution or rate limiting.

set_checkpoint(conn: Connection, task_id: UUID, step_name: str, state: dict[str, Any], owner_run: UUID, extend_claim_by: int | None = None) None[source]

Set a checkpoint for a task with optional claim extension.

Parameters:
  • conn – Database session

  • task_id – Task ID

  • step_name – Checkpoint step name

  • state – Checkpoint state data

  • owner_run – Run ID that owns this checkpoint

  • extend_claim_by – Optional claim extension in seconds

set_run_sleeping(conn: Connection, run_id: UUID, event_name: str) None[source]

Mark a run as SLEEPING waiting for an event.

Called by orchestrator after catching AbsurdSleepException.

sleep(conn: Connection, run_id: UUID, duration_seconds: int) None[source]

Durable sleep that survives crashes and restarts.

spawn_task(conn: Connection, task_name: str, params: dict[str, Any], options: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, retry_strategy: dict[str, Any] | None = None, max_attempts: int | None = None, cancellation: dict[str, Any] | None = None, workflow_run_id: UUID | None = None) tuple[UUID, UUID, UUID][source]

Spawn a new task in the Absurd queue with full feature support.

Parameters:
  • conn – Database connection (psycopg3)

  • task_name – Name of the task

  • params – Task parameters

  • options – Legacy options dict (for backward compatibility)

  • headers – Task headers for metadata

  • retry_strategy – Retry strategy configuration

  • max_attempts – Maximum retry attempts

  • cancellation – Cancellation rules (max_delay, max_duration)

  • workflow_run_id – Optional workflow run ID for tracking

Returns: (task_id, run_id, workflow_run_id)

update_workflow_run_status(conn: Connection, workflow_run_id: UUID, status: str, result: dict[str, Any] | None = None, error: dict[str, Any] | None = None, started_at: datetime | None = None, completed_at: datetime | None = None, task_count: int | None = None) None[source]

Update workflow_run status and metadata.

Parameters:
  • conn – Database connection

  • workflow_run_id – UUID of workflow_run to update

  • status – New status (pending, running, completed, failed, cancelled)

  • result – Optional final result

  • error – Optional error details

  • started_at – Optional start timestamp

  • completed_at – Optional completion timestamp

  • task_count – Optional task count

wait_for_event(conn: Connection, run_id: UUID, event_name: str, timeout_seconds: int | None = None, task_id: UUID | None = None, step_name: str | None = None) Any[source]

Wait for an event using Absurd’s sleep/wake mechanism.

This implements the proper Absurd pattern: 1. Check if event already exists (fast path - return immediately) 2. Check if this run has been woken with event payload 3. If not, register wait and mark run as SLEEPING 4. Orchestrator will free worker thread and process other tasks 5. When event is emitted, run is woken and will resume here

Parameters:
  • conn – Database session

  • run_id – The Absurd run ID (NOT workflow run ID!)

  • event_name – Name of event to wait for

  • timeout_seconds – Optional timeout (default 24 hours)

  • task_id – The Absurd task ID (required for registering wait)

  • step_name – The step name (required for registering wait)

Returns:

Event payload when event is received

Raises:

TimeoutError – If timeout expires before event received

Exception Classes

class absurd_client.AbsurdSleepError(message: str, run_id: UUID | None = None, event_name: str | None = None)[source]

Bases: Exception

Raised when a run enters SLEEPING state to wait for an event.

This signals to the orchestrator that the worker thread should be freed to process other tasks while this run waits.

__init__(message: str, run_id: UUID | None = None, event_name: str | None = None) None[source]

Helper Functions

absurd_client.spawn_retry_task(client: AbsurdClient, conn: Connection, task_name: str, params: dict[str, Any], max_attempts: int = 3, retry_kind: str = 'exponential', base_seconds: int = 30, factor: float = 2.0, max_seconds: int | None = None) tuple[UUID, UUID, UUID][source]

Spawn a task with retry strategy.

absurd_client.spawn_cancellable_task(client: AbsurdClient, conn: Connection, task_name: str, params: dict[str, Any], max_delay_seconds: int | None = None, max_duration_seconds: int | None = None) tuple[UUID, UUID, UUID][source]

Spawn a task with cancellation rules.

absurd_client.get_absurd_client(queue_name: str | None = None, worker_id: str | None = None) AbsurdClient[source]

Get the singleton AbsurdClient instance.

Parameters:
  • queue_name – Optional queue name (only used on first call)

  • worker_id – Optional worker ID (only used on first call)

Returns:

Shared AbsurdClient instance