Queuety
Workflows

Sequential Chains

The most common workflow pattern is a linear chain of steps. Each step runs after the previous one completes, with the accumulated state passed forward.

The ->then() method

Use ->then() to add sequential steps to a workflow:

use Queuety\Queuety;

Queuety::workflow( 'onboarding' )
    ->then( CreateAccountHandler::class )
    ->then( SendWelcomeEmailHandler::class )
    ->then( NotifyAdminHandler::class )
    ->dispatch( [ 'email' => 'user@example.com' ] );

Steps execute in the order they are added. The workflow only advances when the current step completes successfully.

The Step interface

Workflow step handlers implement the Queuety\Step interface:

use Queuety\Step;

class CreateAccountHandler implements Step {
    public function handle( array $state ): array {
        $user_id = wp_create_user( $state['email'], wp_generate_password() );
        return [ 'user_id' => $user_id ];
    }

    public function config(): array {
        return [
            'needs_wordpress' => true,
            'max_attempts'    => 3,
        ];
    }
}

Key differences from the Handler interface:

Handler (jobs)Step (workflows)
Method signaturehandle(array $payload): voidhandle(array $state): array
InputJob payloadAccumulated workflow state
OutputNothing (void)Data to merge into state

State flow

Each step receives the full accumulated state and returns an array of new key-value pairs. The returned data is merged into the state for subsequent steps:

class FetchUserHandler implements Step {
    public function handle( array $state ): array {
        // $state contains: ['user_id' => 42]
        $user = get_user_by( 'ID', $state['user_id'] );
        return [ 'user_name' => $user->display_name, 'user_email' => $user->user_email ];
    }

    public function config(): array {
        return [ 'needs_wordpress' => true ];
    }
}

class GenerateReportHandler implements Step {
    public function handle( array $state ): array {
        // $state contains: ['user_id' => 42, 'user_name' => 'Alice', 'user_email' => 'alice@example.com']
        $report = build_report( $state['user_name'] );
        return [ 'report_url' => $report->url ];
    }

    public function config(): array {
        return [];
    }
}

Step configuration

The config() method supports the following keys:

KeyTypeDefaultDescription
needs_wordpressboolfalseBoot WordPress before executing this step
max_attemptsint3Maximum retry attempts for this step
backoffstringexponentialBackoff strategy for retries

LLM pipeline example

A common use case is chaining LLM API calls with data processing:

use Queuety\Step;

class BuildPromptHandler implements Step {
    public function handle( array $state ): array {
        $user = get_user_by( 'ID', $state['user_id'] );
        $posts = get_posts( [ 'author' => $user->ID, 'numberposts' => 10 ] );
        $context = implode( "\n", array_map( fn( $p ) => $p->post_content, $posts ) );

        return [
            'prompt' => "Summarize this author's recent work:\n\n{$context}",
        ];
    }

    public function config(): array {
        return [ 'needs_wordpress' => true ];
    }
}

class CallOpenAIHandler implements Step {
    public function handle( array $state ): array {
        $response = wp_remote_post( 'https://api.openai.com/v1/chat/completions', [
            'timeout' => 120,
            'headers' => [
                'Authorization' => 'Bearer ' . OPENAI_API_KEY,
                'Content-Type'  => 'application/json',
            ],
            'body' => json_encode( [
                'model'    => 'gpt-4',
                'messages' => [ [ 'role' => 'user', 'content' => $state['prompt'] ] ],
            ] ),
        ] );

        $body = json_decode( wp_remote_retrieve_body( $response ), true );
        return [ 'summary' => $body['choices'][0]['message']['content'] ];
    }

    public function config(): array {
        return [ 'needs_wordpress' => true, 'max_attempts' => 5 ];
    }
}

class SaveSummaryHandler implements Step {
    public function handle( array $state ): array {
        $post_id = wp_insert_post( [
            'post_title'   => "Summary for user #{$state['user_id']}",
            'post_content' => $state['summary'],
            'post_status'  => 'publish',
        ] );
        return [ 'summary_post_id' => $post_id ];
    }

    public function config(): array {
        return [ 'needs_wordpress' => true ];
    }
}

Dispatch the pipeline:

Queuety::workflow( 'summarize_author' )
    ->then( BuildPromptHandler::class )
    ->then( CallOpenAIHandler::class )
    ->then( SaveSummaryHandler::class )
    ->on_queue( 'llm' )
    ->max_attempts( 5 )
    ->dispatch( [ 'user_id' => 42 ] );

If the OpenAI call times out, the worker retries only that step. The prompt built in step 0 is already safely stored in the workflow state.

On this page