Workflows
Workflows break long-running work into discrete steps. Each step persists its output to a shared state bag in the database. If PHP dies mid-step, the worker retries that step with all prior state intact.
What are workflows?
A workflow is an ordered sequence of steps. Each step is a PHP class that receives the accumulated state from all previous steps and returns new data to merge into the state. The workflow advances one step at a time, with each transition wrapped in an atomic MySQL transaction.
use Queuety\Queuety;
$workflow_id = Queuety::workflow( 'generate_report' )
->then( FetchDataHandler::class )
->then( CallLLMHandler::class )
->then( FormatOutputHandler::class )
->dispatch( [ 'user_id' => 42 ] );State accumulation
The initial payload you pass to dispatch() becomes the starting state. Each step's return value is merged into the state via array_merge:
dispatch(['user_id' => 42])
Step 0: FetchDataHandler -> returns ['user_name' => 'Alice']
state: {user_id: 42, user_name: 'Alice'}
Step 1: CallLLMHandler -> returns ['llm_response' => '...']
state: {user_id: 42, user_name: 'Alice', llm_response: '...'}
Step 2: FormatOutputHandler -> returns ['report_url' => '/reports/42.pdf']
state: {user_id: 42, user_name: 'Alice', llm_response: '...', report_url: '/reports/42.pdf'}Every step sees the full accumulated state. This makes it easy to pass data between steps without any external storage.
Resumability
Workflows are durable because the state is persisted to the database after every step. If the worker crashes during step 1, the workflow restarts from step 1 with step 0's output already in the state.
The step boundary is a single MySQL transaction that atomically:
- Updates the workflow state with the step's output
- Marks the current step's job as completed
- Enqueues the next step's job
This guarantees that no state is ever lost, even in the face of crashes, timeouts, or PHP fatal errors.
Workflow lifecycle
dispatch() -> running -> step 0 -> step 1 -> ... -> step N -> completed
| |
| +-> failed -> retry -> running (from failed step)
|
+-> paused -> resumed -> running
|
+-> waiting_signal -> signal received -> running
|
+-> cancelled (via cancel_workflow)| Status | Description |
|---|---|
running | Actively processing steps |
completed | All steps finished successfully |
failed | A step failed after exhausting retries |
paused | Manually paused, no steps will execute |
waiting_signal | Waiting for an external signal via Queuety::signal() |
cancelled | Explicitly cancelled via Queuety::cancel_workflow(). Pending jobs are buried and an optional cleanup handler runs. |
Managing workflows
Check status:
$state = Queuety::workflow_status( $workflow_id );
echo $state->status->value; // 'running'
echo $state->current_step; // 1
echo $state->total_steps; // 3Pause and resume:
Queuety::pause_workflow( $workflow_id );
Queuety::resume_workflow( $workflow_id );Retry a failed workflow from its failed step:
Queuety::retry_workflow( $workflow_id );Cancel a workflow and run cleanup handlers:
Queuety::cancel_workflow( $workflow_id );See Cancellation for details on cleanup handlers and what happens when a workflow is cancelled.
Workflow options
Set queue, priority, and retry behavior for all steps in a workflow:
use Queuety\Enums\Priority;
Queuety::workflow( 'important_pipeline' )
->on_queue( 'pipelines' )
->with_priority( Priority::High )
->max_attempts( 5 )
->on_cancel( PipelineCleanupHandler::class )
->prune_state_after( 3 )
->then( StepA::class )
->then( StepB::class )
->dispatch( $payload );The on_cancel() method registers a cleanup handler that runs when the workflow is cancelled. See Cancellation.
The prune_state_after() method enables automatic removal of old step outputs to keep the workflow state bounded. See State Pruning.
Step types
Queuety supports seven types of steps within a workflow:
- Sequential chains using
->then()for one-at-a-time execution - Parallel groups using
->parallel()for concurrent execution - Conditional branching using named steps and
_goto - Sub-workflows using
->sub_workflow()for nested composition - Durable timers using
->sleep()for time-based delays - Signals using
->wait_for_signal()for external event triggers - Streaming steps using the
StreamingStepinterface for durable chunk-based output