A durable state-machine runner for workflow graphs with explicit blocking via WaitState. Not an agent framework, not a prompt engine — a persistent graph runner.
Every workflow step is checkpointed and journaled. Runs survive process restarts, network failures, and deployment updates. Resume exactly where you left off.
# The core loop: start → tick → wait → resume
from abstractruntime import Runtime, WorkflowSpec
from abstractruntime.storage import (
InMemoryRunStore, InMemoryLedgerStore
)
rt = Runtime(
run_store=InMemoryRunStore(),
ledger_store=InMemoryLedgerStore()
)
run_id = rt.start(workflow=wf)
state = rt.tick(workflow=wf, run_id=run_id)
# Run blocked on user input
assert state.status.value == "waiting"
# Resume from any process, any time
state = rt.resume(
workflow=wf,
run_id=run_id,
wait_key=state.waiting.wait_key,
payload={"text": "yes"}
)Everything needed for durable, observable workflow execution.
Workflows are defined as WorkflowSpec graphs with typed nodes. Nodes return StepPlan with effects — they never execute side effects directly.
RunState checkpointed via RunStore. Every step appended to an append-only LedgerStore. All vars must be JSON-serializable.
Built-in effects: ASK_USER, WAIT_UNTIL, WAIT_EVENT, ANSWER_USER, EMIT_EVENT, START_SUBWORKFLOW, VARS_QUERY. Host-wired: LLM_CALL, TOOL_CALLS via AbstractCore.
Runs block with typed WaitState and resume from any process. The runtime persists wait metadata and resumes to the correct node without re-execution.
In-memory (tests), JSON/JSONL files (dev), SQLite (production). ArtifactStore for large payloads. Offloading decorators for transparent payload externalization.
HashChainedLedgerStore + verify_ledger_chain() for cryptographic provenance. ObservableLedgerStore for live subscriptions.
SnapshotStore captures point-in-time state for debugging, rollback, and auditing. Bookmark any run state and restore it later.
Start child runs with START_SUBWORKFLOW. Emit and wait for durable events. Cross-workflow coordination via event matching.
Effect handlers for LLM_CALL, TOOL_CALLS, MODEL_RESIDENCY. Three execution modes: local, remote, hybrid. Approval-gated tool execution with configurable policy.
Build your first durable workflow — pause it, kill the process, resume it.
pip install abstractruntime
# With AbstractCore LLM + tools integration
pip install "abstractruntime[abstractcore]"
# Full multimodal
pip install "abstractruntime[multimodal]"from abstractruntime import (
Effect, EffectType, Runtime, StepPlan, WorkflowSpec
)
from abstractruntime.storage import (
InMemoryLedgerStore, InMemoryRunStore
)
def ask(run, ctx):
return StepPlan(
node_id="ask",
effect=Effect(
type=EffectType.ASK_USER,
payload={"prompt": "Continue?"},
result_key="answer",
),
next_node="done",
)
def done(run, ctx):
answer = run.vars.get("answer") or {}
text = answer.get("text") if isinstance(answer, dict) else None
return StepPlan(
node_id="done",
complete_output={"answer": text}
)
wf = WorkflowSpec(
workflow_id="demo",
entry_node="ask",
nodes={"ask": ask, "done": done}
)
rt = Runtime(
run_store=InMemoryRunStore(),
ledger_store=InMemoryLedgerStore()
)
run_id = rt.start(workflow=wf)
state = rt.tick(workflow=wf, run_id=run_id)
assert state.status.value == "waiting"
state = rt.resume(
workflow=wf,
run_id=run_id,
wait_key=state.waiting.wait_key,
payload={"text": "yes"}
)
assert state.status.value == "completed"from abstractruntime.integrations.abstractcore import (
create_local_runtime
)
rt = create_local_runtime(
provider="ollama",
model="qwen3:4b"
)from abstractruntime.integrations.abstractcore import (
ApprovalToolExecutor,
MappingToolExecutor,
ToolApprovalPolicy,
)
tools = ApprovalToolExecutor(
delegate=MappingToolExecutor({"write_file": write_file}),
policy=ToolApprovalPolicy(),
)Core imports, effects, and storage backends.
# Core kernel
from abstractruntime import (
Effect, EffectType, Runtime, StepPlan, WorkflowSpec
)
# Storage backends
from abstractruntime.storage import (
InMemoryRunStore, InMemoryLedgerStore,
JsonFileRunStore, JsonlLedgerStore,
)
# SQLite backends
from abstractruntime import SqliteRunStore, SqliteLedgerStore
# Zero-config driver wrapper
from abstractruntime import create_scheduled_runtime| Effect | Behavior |
|---|---|
ASK_USER | Blocks with WaitReason.USER and durable prompt metadata |
WAIT_UNTIL | Blocks until ISO timestamp (time-based scheduling) |
WAIT_EVENT | Blocks waiting for a named event from another workflow |
ANSWER_USER | Non-blocking “send a message” to the host UI |
EMIT_EVENT | Emits a durable event; resumes matching WAIT_EVENT runs |
START_SUBWORKFLOW | Starts a child run (sync mode blocks parent) |
VARS_QUERY | Read-only query into RunState.vars paths |
LLM_CALL | Host-wired via AbstractCore (text, multimodal, media gen) |
TOOL_CALLS | Host-wired via AbstractCore (approval-gated execution) |
MEMORY_* | NOTE, QUERY, TAG, REHYDRATE, COMPACT — runtime memory |
Fast, ephemeral storage for tests and development. InMemoryRunStore + InMemoryLedgerStore.
Human-readable persistence. JsonFileRunStore + JsonlLedgerStore for dev and single-process deployments.
Production-grade local storage. SqliteRunStore + SqliteLedgerStore with proper locking and recovery.
ArtifactStore / FileArtifactStore for large binary payloads. Offloading decorators for transparent externalization.
HashChainedLedgerStore + verify_ledger_chain() for tamper-evident audit trails.
SnapshotStore captures point-in-time bookmarks for debugging, rollback, and compliance.
Effect(type=..., payload=...) and executed by effect handlersWaitState under RunState.waitingresume() continues from WaitState.resume_to_node — the waiting node is not re-runRunState.vars must be JSON-serializable — use ArtifactStore for large payloads