Sequential Chains
The most common workflow pattern is a linear chain of steps. Each step runs after the previous one completes, with the accumulated state passed forward.
The ->then() method
Use ->then() to add sequential steps to a workflow:
use Queuety\Queuety;
Queuety::workflow( 'onboarding' )
->then( CreateAccountHandler::class )
->then( SendWelcomeEmailHandler::class )
->then( NotifyAdminHandler::class )
->dispatch( [ 'email' => 'user@example.com' ] );Steps execute in the order they are added. The workflow only advances when the current step completes successfully.
The Step interface
Workflow step handlers implement the Queuety\Step interface:
use Queuety\Step;
class CreateAccountHandler implements Step {
public function handle( array $state ): array {
$user_id = wp_create_user( $state['email'], wp_generate_password() );
return [ 'user_id' => $user_id ];
}
public function config(): array {
return [
'needs_wordpress' => true,
'max_attempts' => 3,
];
}
}Key differences from the Handler interface:
Handler (jobs) | Step (workflows) | |
|---|---|---|
| Method signature | handle(array $payload): void | handle(array $state): array |
| Input | Job payload | Accumulated workflow state |
| Output | Nothing (void) | Data to merge into state |
State flow
Each step receives the full accumulated state and returns an array of new key-value pairs. The returned data is merged into the state for subsequent steps:
class FetchUserHandler implements Step {
public function handle( array $state ): array {
// $state contains: ['user_id' => 42]
$user = get_user_by( 'ID', $state['user_id'] );
return [ 'user_name' => $user->display_name, 'user_email' => $user->user_email ];
}
public function config(): array {
return [ 'needs_wordpress' => true ];
}
}
class GenerateReportHandler implements Step {
public function handle( array $state ): array {
// $state contains: ['user_id' => 42, 'user_name' => 'Alice', 'user_email' => 'alice@example.com']
$report = build_report( $state['user_name'] );
return [ 'report_url' => $report->url ];
}
public function config(): array {
return [];
}
}Step configuration
The config() method supports the following keys:
| Key | Type | Default | Description |
|---|---|---|---|
needs_wordpress | bool | false | Boot WordPress before executing this step |
max_attempts | int | 3 | Maximum retry attempts for this step |
backoff | string | exponential | Backoff strategy for retries |
LLM pipeline example
A common use case is chaining LLM API calls with data processing:
use Queuety\Step;
class BuildPromptHandler implements Step {
public function handle( array $state ): array {
$user = get_user_by( 'ID', $state['user_id'] );
$posts = get_posts( [ 'author' => $user->ID, 'numberposts' => 10 ] );
$context = implode( "\n", array_map( fn( $p ) => $p->post_content, $posts ) );
return [
'prompt' => "Summarize this author's recent work:\n\n{$context}",
];
}
public function config(): array {
return [ 'needs_wordpress' => true ];
}
}
class CallOpenAIHandler implements Step {
public function handle( array $state ): array {
$response = wp_remote_post( 'https://api.openai.com/v1/chat/completions', [
'timeout' => 120,
'headers' => [
'Authorization' => 'Bearer ' . OPENAI_API_KEY,
'Content-Type' => 'application/json',
],
'body' => json_encode( [
'model' => 'gpt-4',
'messages' => [ [ 'role' => 'user', 'content' => $state['prompt'] ] ],
] ),
] );
$body = json_decode( wp_remote_retrieve_body( $response ), true );
return [ 'summary' => $body['choices'][0]['message']['content'] ];
}
public function config(): array {
return [ 'needs_wordpress' => true, 'max_attempts' => 5 ];
}
}
class SaveSummaryHandler implements Step {
public function handle( array $state ): array {
$post_id = wp_insert_post( [
'post_title' => "Summary for user #{$state['user_id']}",
'post_content' => $state['summary'],
'post_status' => 'publish',
] );
return [ 'summary_post_id' => $post_id ];
}
public function config(): array {
return [ 'needs_wordpress' => true ];
}
}Dispatch the pipeline:
Queuety::workflow( 'summarize_author' )
->then( BuildPromptHandler::class )
->then( CallOpenAIHandler::class )
->then( SaveSummaryHandler::class )
->on_queue( 'llm' )
->max_attempts( 5 )
->dispatch( [ 'user_id' => 42 ] );If the OpenAI call times out, the worker retries only that step. The prompt built in step 0 is already safely stored in the workflow state.