PHP API
Everything in Queuety is accessible through the Queuety\Queuety static class. It is the single entry point for dispatching jobs, building workflows, managing queues, and accessing internal subsystems.
use Queuety\Queuety;Jobs
Queuety::dispatch( $handler, $payload )
Dispatch a job. Accepts either a handler name string or a Contracts\Job instance. Returns a PendingJob builder for chaining options.
// Handler name + payload (original API)
Queuety::dispatch( 'send_email', [ 'to' => 'user@example.com' ] );
// Contracts\Job instance (v0.6.0+)
Queuety::dispatch( new SendEmailJob( to: 'user@example.com' ) );
// With options
Queuety::dispatch( 'send_email', [ 'to' => 'user@example.com' ] )
->on_queue( 'emails' )
->with_priority( Priority::High )
->delay( 300 )
->max_attempts( 5 )
->rate_limit( 10, 60 )
->unique()
->after( $other_job_id );
// Get the job ID
$job_id = Queuety::dispatch( 'send_email', $payload )->id();When a Contracts\Job instance is passed, the serializer extracts the FQCN as the handler name and public properties as the payload. The $payload parameter is ignored when $handler is a Job instance.
PendingJob methods
| Method | Description |
|---|---|
on_queue( string $queue ) | Set the target queue |
with_priority( Priority $priority ) | Set the priority level |
delay( int $seconds ) | Delay before the job becomes available |
max_attempts( int $max ) | Maximum retry attempts |
rate_limit( int $max, int $window ) | Rate limit: max executions per window seconds |
unique() | Prevent duplicate dispatches |
after( int $job_id ) | Wait for another job to complete first |
id() | Force dispatch and return the job ID |
Queuety::dispatch_sync( $handler, $payload )
Execute a job synchronously in the current process without adding it to the queue.
// With a Job instance
Queuety::dispatch_sync( new SendEmailJob( to: 'user@example.com', subject: 'Hi', body: 'Hello' ) );
// With a handler name and payload
Queuety::dispatch_sync( 'send_email', [ 'to' => 'user@example.com' ] );The job's handle() method is called directly. No database row is created, no worker is involved, and middleware is not applied.
Queuety::batch( $jobs )
Dispatch multiple jobs in a single multi-row INSERT.
$job_ids = Queuety::batch( [
[ 'handler' => 'send_email', 'payload' => [ 'to' => 'a@example.com' ], 'queue' => 'emails' ],
[ 'handler' => 'send_email', 'payload' => [ 'to' => 'b@example.com' ], 'priority' => 2 ],
] );Queuety::create_batch( $jobs )
Create a BatchBuilder for dispatching a group of jobs with callbacks and progress tracking. See Batching for full details.
$batch = Queuety::create_batch( [
new ImportUsersJob( $chunk_1 ),
new ImportUsersJob( $chunk_2 ),
] )
->name( 'Import users' )
->then( ImportCompleteHandler::class )
->catch( ImportFailedHandler::class )
->finally( ImportCleanupHandler::class )
->allow_failures()
->on_queue( 'imports' )
->dispatch();BatchBuilder methods
| Method | Returns | Description |
|---|---|---|
name( string $name ) | self | Set a name for the batch |
on_queue( string $queue ) | self | Set the queue for all jobs in the batch |
then( string $handler_class ) | self | Handler called when the batch completes successfully |
catch( string $handler_class ) | self | Handler called when the first job in the batch fails |
finally( string $handler_class ) | self | Handler called when the batch finishes (success or failure) |
allow_failures() | self | Allow the then callback to fire even with failures |
dispatch() | Batch | Create the batch row and all jobs, returns the Batch value object |
Queuety::find_batch( $id )
Find a batch by ID. Returns a Batch value object or null.
$batch = Queuety::find_batch( $batch_id );
if ( null !== $batch ) {
echo $batch->progress() . '% complete';
}Batch value object
| Method/Property | Type | Description |
|---|---|---|
$id | int | Batch ID |
$name | string|null | Optional batch name |
$total_jobs | int | Total number of jobs in the batch |
$pending_jobs | int | Jobs still pending |
$failed_jobs | int | Number of failed jobs |
$failed_job_ids | array | IDs of failed jobs |
$created_at | DateTimeImmutable | When the batch was created |
$finished_at | DateTimeImmutable|null | When the batch finished |
$cancelled_at | DateTimeImmutable|null | When the batch was cancelled |
progress() | int | Completion percentage (0-100) |
finished() | bool | Whether the batch has finished |
has_failures() | bool | Whether any job failed |
cancelled() | bool | Whether the batch was cancelled |
Queuety::chain( $jobs )
Create a ChainBuilder for sequential job execution. Each job depends on the previous one completing successfully.
$first_job_id = Queuety::chain( [
new FetchDataJob( $url ),
new ProcessDataJob(),
new NotifyCompleteJob(),
] )
->on_queue( 'pipeline' )
->catch( ChainFailedHandler::class )
->dispatch();ChainBuilder methods
| Method | Returns | Description |
|---|---|---|
on_queue( string $queue ) | self | Set the queue for all jobs in the chain |
catch( string $handler_class ) | self | Handler called when any job in the chain fails |
dispatch() | int | Create all jobs with sequential dependencies, returns the first job ID |
Queuety::retry( $job_id )
Retry a specific job. Resets it to pending status.
Queuety::retry( 42 );Queuety::retry_buried()
Retry all buried jobs. Returns the count of jobs retried.
$count = Queuety::retry_buried();Queuety::purge( $older_than_days )
Purge completed jobs. Returns the count of jobs purged.
$count = Queuety::purge(); // uses QUEUETY_RETENTION_DAYS
$count = Queuety::purge( 30 ); // purge jobs older than 30 daysQueues
Queuety::stats( $queue )
Get job counts grouped by status.
$stats = Queuety::stats(); // all queues
$stats = Queuety::stats( 'emails' ); // specific queue
// ['pending' => 12, 'processing' => 3, 'completed' => 150, 'failed' => 2, 'buried' => 1]Queuety::buried( $queue )
Get all buried jobs.
$jobs = Queuety::buried();
$jobs = Queuety::buried( 'emails' );Queuety::pause( $queue )
Pause a queue so workers skip it.
Queuety::pause( 'emails' );Queuety::resume( $queue )
Resume a paused queue.
Queuety::resume( 'emails' );Queuety::is_paused( $queue )
Check if a queue is paused.
if ( Queuety::is_paused( 'emails' ) ) {
// Queue is paused
}Workflows
Queuety::workflow( $name )
Start building a workflow. Returns a WorkflowBuilder.
$workflow_id = Queuety::workflow( 'generate_report' )
->then( FetchDataHandler::class )
->then( CallLLMHandler::class )
->then( FormatOutputHandler::class )
->dispatch( [ 'user_id' => 42 ] );WorkflowBuilder methods
| Method | Description |
|---|---|
then( string $class, ?string $name ) | Add a sequential step. Optional name for conditional branching. |
parallel( array $classes ) | Add a parallel step group. |
sub_workflow( string $name, WorkflowBuilder $builder ) | Add a sub-workflow step. |
sleep( int $seconds, int $minutes, int $hours, int $days ) | Add a durable timer step. All durations are summed. |
wait_for_signal( string $name ) | Add a signal wait step. Pauses until the named signal is received. |
on_queue( string $queue ) | Set the queue for all steps. |
with_priority( Priority $priority ) | Set the priority for all steps. |
max_attempts( int $max ) | Set max retry attempts per step. |
on_cancel( string $class ) | Register a cleanup handler class for cancellation. See Cancellation. |
prune_state_after( int $steps = 2 ) | Enable automatic state pruning. Removes step outputs older than the threshold. See State Pruning. |
dispatch( array $payload ) | Dispatch the workflow. Returns the workflow ID. |
Queuety::workflow_status( $workflow_id )
Get workflow status. Returns a WorkflowState object or null.
$state = Queuety::workflow_status( $workflow_id );
echo $state->name; // 'generate_report'
echo $state->status->value; // 'running'
echo $state->current_step; // 1
echo $state->total_steps; // 3
echo $state->state; // accumulated state arrayQueuety::retry_workflow( $workflow_id )
Retry a failed workflow from its failed step.
Queuety::retry_workflow( $workflow_id );Queuety::pause_workflow( $workflow_id )
Pause a running workflow.
Queuety::pause_workflow( $workflow_id );Queuety::resume_workflow( $workflow_id )
Resume a paused workflow.
Queuety::resume_workflow( $workflow_id );Queuety::cancel_workflow( $workflow_id )
Cancel a workflow and run any registered cleanup handlers. Sets the status to cancelled and buries all pending and processing jobs for the workflow.
Queuety::cancel_workflow( $workflow_id );Throws RuntimeException if the workflow is already completed or cancelled. See Cancellation for details.
Queuety::signal( $workflow_id, $name, $data )
Send a signal to a workflow. If the workflow is currently waiting for this signal, it resumes immediately. Otherwise, the signal is stored and picked up when the workflow reaches the corresponding wait_for_signal step.
Queuety::signal( $workflow_id, 'approved' );
// With data merged into workflow state
Queuety::signal( $workflow_id, 'approved', [
'approved_by' => 'admin@example.com',
] );See Workflow Signals for details.
Workflow templates
Queuety::define_workflow( $name )
Create a workflow builder for use as a template.
$builder = Queuety::define_workflow( 'onboarding' )
->then( CreateAccountHandler::class )
->then( SendWelcomeEmailHandler::class );Queuety::register_workflow_template( $builder )
Register a workflow template.
Queuety::register_workflow_template( $builder );Queuety::run_workflow( $name, $payload )
Dispatch a registered workflow template by name. Returns the workflow ID.
$workflow_id = Queuety::run_workflow( 'onboarding', [ 'email' => 'user@example.com' ] );Throws RuntimeException if the template is not registered.
Queuety::workflow_templates()
Get the workflow template registry.
$registry = Queuety::workflow_templates();
$template = $registry->get( 'onboarding' ); // WorkflowTemplate or nullScheduling
Queuety::schedule( $handler, $payload )
Create a recurring schedule. Returns a PendingSchedule builder.
Queuety::schedule( 'cleanup_handler' )
->every( '1 hour' )
->on_queue( 'maintenance' );
Queuety::schedule( 'nightly_report' )
->cron( '0 3 * * *' )
->on_queue( 'reports' );PendingSchedule methods
| Method | Description |
|---|---|
every( string $interval ) | Set an interval expression (e.g. 1 hour, 30 minutes) |
cron( string $expression ) | Set a 5-field cron expression |
overlap( OverlapPolicy $policy ) | Set the overlap policy (default: Allow). See Scheduling. |
on_queue( string $queue ) | Set the target queue (default: default) |
id() | Force dispatch and return the schedule ID |
Queuety::scheduler()
Get the internal Scheduler instance.
$scheduler = Queuety::scheduler();
$schedules = $scheduler->list();
$scheduler->remove( 'handler_name' );
$count = $scheduler->tick(); // manually triggerHandler registration
Queuety::register( $name, $class )
Register a handler class under a name.
Queuety::register( 'send_email', SendEmailHandler::class );Queuety::discover_handlers( $directory, $namespace )
Auto-discover and register handler classes from a directory. Returns the count of handlers registered.
$count = Queuety::discover_handlers( __DIR__ . '/handlers', 'MyPlugin\\Handlers' );Queuety::registry()
Get the handler registry.
$registry = Queuety::registry();Observability
Queuety::logger()
Get the Logger instance for querying log entries.
$logger = Queuety::logger();
$logs = $logger->for_job( 42 );
$logs = $logger->for_workflow( 7 );
$logs = $logger->for_handler( 'send_email', 50 );
$logs = $logger->for_event( LogEvent::Failed, 50 );
$logs = $logger->since( new \DateTimeImmutable( '-1 hour' ), 100 );
$count = $logger->purge( 30 );Queuety::metrics()
Get the Metrics instance.
$stats = Queuety::metrics()->handler_stats( 60 ); // last 60 minutesQueuety::webhook_notifier()
Get the WebhookNotifier instance.
$notifier = Queuety::webhook_notifier();
$id = $notifier->register( 'job.buried', 'https://hooks.slack.com/...' );
$webhooks = $notifier->list();
$notifier->remove( $id );Queuety::rate_limiter()
Get the RateLimiter instance.
$limiter = Queuety::rate_limiter();
$limiter->register( 'call_openai', 60, 60 );Cache
Queuety::cache()
Get the active cache backend instance.
$cache = Queuety::cache();
$cache->set( 'my_key', 'my_value', 60 );
$value = $cache->get( 'my_key' );Queuety::set_cache( $cache )
Override the cache backend. Call this before Queuety::init() to use a custom implementation.
Queuety::set_cache( new RedisCache() );
Queuety::init( $connection );QUEUETY_CACHE_TTL
Default cache TTL in seconds (default: 5). Set this constant in wp-config.php to change the default TTL used by internal cache operations.
define( 'QUEUETY_CACHE_TTL', 10 );See Caching for details on backends and auto-detection.
Workflow event log
Queuety::workflow_events()
Get the WorkflowEventLog instance.
$log = Queuety::workflow_events();Queuety::workflow_timeline( $workflow_id )
Get the full timeline of events for a workflow. Returns an array of event rows ordered by ID.
$events = Queuety::workflow_timeline( $workflow_id );Each event row contains: id, workflow_id, step_index, handler, event, state_snapshot, step_output, error_message, duration_ms, created_at.
Queuety::workflow_state_at( $workflow_id, $step_index )
Get the state snapshot at a specific workflow step. Returns the full workflow state array as it existed after the given step completed, or null if not found.
$state = Queuety::workflow_state_at( $workflow_id, 2 );See Workflow Event Log for use cases and CLI commands.
Internal instances
These methods return internal instances for advanced use cases.
| Method | Returns | Description |
|---|---|---|
Queuety::queue() | Queue | Queue operations (claim, release, bury) |
Queuety::worker() | Worker | Worker process control |
Queuety::workflow_manager() | Workflow | Workflow orchestration |
Queuety::connection() | Connection | PDO database connection |
Queuety::batch_manager() | BatchManager | Batch lifecycle management (cancel, prune) |
Queuety::chunk_store() | ChunkStore | Chunk persistence for streaming steps |
Queuety::workflow_events() | WorkflowEventLog | Workflow event timeline and state snapshots |
Queuety::cache() | Cache | Cache backend instance |
Testing
Queuety::fake()
Replace the queue with an in-memory fake for testing. Returns a QueueFake instance that records all dispatched jobs.
$fake = Queuety::fake();
SendEmailJob::dispatch( 'user@example.com', 'Hello', 'Hi!' );
$fake->assert_pushed( SendEmailJob::class );Queuety::reset()
Reset all internal state, including the queue fake. Call in tearDown().
Queuety::reset();QueueFake methods
| Method | Description |
|---|---|
assert_pushed( string $class, ?Closure $callback ) | Assert a job class was pushed. Optional callback filters by job data. |
assert_pushed_times( string $class, int $count ) | Assert a job was pushed exactly N times. |
assert_not_pushed( string $class ) | Assert a job was never pushed. |
assert_nothing_pushed() | Assert no jobs were pushed at all. |
assert_batched( ?Closure $callback ) | Assert a batch was dispatched. Optional callback filters by batch data. |
pushed( string $class ) | Get all recorded dispatches for a class. Returns an array. |
batches() | Get all recorded batches. Returns an array. |
reset() | Clear all recorded jobs and batches. |
See Testing for full examples.
Interfaces
Handler
Interface for simple job handlers.
interface Handler {
public function handle( array $payload ): void;
public function config(): array;
}Step
Interface for workflow step handlers.
interface Step {
public function handle( array $state ): array;
public function config(): array;
}Contracts\Job
Interface for self-contained dispatchable job classes. Job classes encapsulate their payload as public properties and execution logic in handle().
namespace Queuety\Contracts;
interface Job {
public function handle(): void;
}Job classes can optionally define a middleware() method returning an array of Contracts\Middleware instances.
Contracts\Cache
Interface for cache backends. See Caching for built-in implementations and custom backends.
namespace Queuety\Contracts;
interface Cache {
public function get( string $key ): mixed;
public function set( string $key, mixed $value, int $ttl = 0 ): void;
public function delete( string $key ): void;
public function has( string $key ): bool;
public function add( string $key, mixed $value, int $ttl = 0 ): bool;
public function flush(): void;
}| Method | Returns | Description |
|---|---|---|
get( $key ) | mixed | Retrieve a value. Returns null if not found or expired. |
set( $key, $value, $ttl ) | void | Store a value. TTL of 0 means no expiry. |
delete( $key ) | void | Remove a value. |
has( $key ) | bool | Check if a key exists and is not expired. |
add( $key, $value, $ttl ) | bool | Atomic set-if-not-exists. Returns true if the value was set. |
flush() | void | Remove all items from the cache. |
WorkflowEventLog
Records workflow step transitions with full state snapshots. Access via Queuety::workflow_events().
| Method | Returns | Description |
|---|---|---|
record_step_started( int $workflow_id, int $step_index, string $handler ) | void | Record a step_started event |
record_step_completed( int $workflow_id, int $step_index, string $handler, array $state_snapshot, array $step_output, int $duration_ms ) | void | Record a step_completed event with full state snapshot |
record_step_failed( int $workflow_id, int $step_index, string $handler, string $error, int $duration_ms ) | void | Record a step_failed event |
get_timeline( int $workflow_id ) | array | Get all events for a workflow, ordered by ID |
get_state_at_step( int $workflow_id, int $step_index ) | array|null | Get the state snapshot after a specific step completed |
prune( int $older_than_days ) | int | Delete events older than the given number of days. Returns rows deleted. |
Contracts\StreamingStep
Interface for workflow step handlers that produce a stream of chunks. Each yielded value is persisted to the database immediately. See Streaming Steps.
namespace Queuety\Contracts;
interface StreamingStep {
public function stream( array $state, array $existing_chunks = [] ): \Generator;
public function on_complete( array $chunks, array $state ): array;
public function config(): array;
}| Method | Description |
|---|---|
stream( $state, $existing_chunks ) | Generator that yields string chunks. Each yield triggers an immediate DB write. On retry, $existing_chunks contains previously persisted chunks. |
on_complete( $chunks, $state ) | Called after the stream finishes. Returns data to merge into workflow state. |
config() | Optional configuration. Supports needs_wordpress and max_attempts. |
ChunkStore
Manages chunk persistence for streaming steps. Access via Queuety::chunk_store().
| Method | Returns | Description |
|---|---|---|
get_chunks( int $job_id ) | string[] | Fetch all chunks for a job, ordered by chunk index |
append_chunk( int $job_id, int $chunk_index, string $content, ?int $workflow_id, ?int $step_index ) | void | Append a single chunk |
clear_chunks( int $job_id ) | void | Delete all chunks for a job |
chunk_count( int $job_id ) | int | Count chunks for a job |
get_accumulated( int $job_id ) | string | Concatenate all chunks into one string |
Heartbeat
Static helper for sending activity heartbeats from inside step handlers and streaming steps. See Heartbeats.
| Method | Description |
|---|---|
Heartbeat::init( int $job_id, Connection $conn ) | Set the job context. Called automatically by the worker. |
Heartbeat::beat( array $progress = [] ) | Send a heartbeat. Updates reserved_at and optionally stores progress data in heartbeat_data. |
Heartbeat::clear() | Clear the job context. Called automatically by the worker. |
Heartbeat::current_job_id() | Get the current job ID (for testing). |
Contracts\Middleware
Interface for job middleware. Middleware wraps job execution in an onion-style pipeline.
namespace Queuety\Contracts;
interface Middleware {
public function handle( object $job, \Closure $next ): void;
}Traits
Dispatchable
Provides static dispatch methods on job classes implementing Contracts\Job. Constructor arguments are passed through to create the job instance.
use Queuety\Contracts\Job;
use Queuety\Dispatchable;
class SendEmailJob implements Job {
use Dispatchable;
// ...
}
// Dispatches a new SendEmailJob instance
SendEmailJob::dispatch( 'user@example.com', 'Subject', 'Body' );| Method | Returns | Description |
|---|---|---|
dispatch( ...$args ) | PendingJob | Create an instance and dispatch it |
dispatch_if( bool $condition, ...$args ) | PendingJob|null | Dispatch only if $condition is true |
dispatch_unless( bool $condition, ...$args ) | PendingJob|null | Dispatch only if $condition is false |
with_chain( array $jobs ) | ChainBuilder | Start a chain from this job class |
// Conditional dispatch
SendEmailJob::dispatch_if( $user->wants_email, $user->email, 'Hello', 'Hi!' );
SendEmailJob::dispatch_unless( $maintenance_mode, $user->email, 'Hello', 'Hi!' );
// Chain from a job class
FetchDataJob::with_chain( [
new ProcessDataJob(),
new NotifyCompleteJob(),
] )->dispatch();Job properties
Job classes can declare public properties that the worker reads via reflection to override retry and timeout behavior.
| Property | Type | Description |
|---|---|---|
$tries | int | Maximum number of attempts before the job is buried |
$timeout | int | Maximum execution time in seconds (requires pcntl) |
$backoff | array | Escalating delay values in seconds for each retry attempt |
class ImportProductsJob implements Job {
use Dispatchable;
public int $tries = 5;
public int $timeout = 120;
public array $backoff = [ 10, 30, 60, 120 ];
// ...
}failed() hook
When a job is buried after exhausting all retries, the worker calls a failed() method on the job instance if defined.
public function failed( \Throwable $exception ): void {
error_log( "Job failed: {$exception->getMessage()}" );
}Built-in middleware
All middleware classes implement Contracts\Middleware and are located in the Queuety\Middleware namespace.
| Class | Description |
|---|---|
RateLimited( int $max, int $window ) | Enforce a rate limit of $max executions per $window seconds |
Timeout( int $seconds ) | Kill the job after $seconds seconds (requires pcntl) |
UniqueJob( ?string $key ) | Prevent concurrent execution via a database lock |
WithoutOverlapping( string $key, int $release_after ) | Prevent overlapping execution with automatic lock expiry |
ThrottlesExceptions( int $max_attempts, int $decay_minutes ) | Throttle exceptions to prevent job storms when an external service is down |
See Middleware for usage details and examples.
Enums
Priority
enum Priority: int {
case Low = 0;
case Normal = 1;
case High = 2;
case Urgent = 3;
}JobStatus
enum JobStatus: string {
case Pending = 'pending';
case Processing = 'processing';
case Completed = 'completed';
case Failed = 'failed';
case Buried = 'buried';
}WorkflowStatus
enum WorkflowStatus: string {
case Running = 'running';
case Completed = 'completed';
case Failed = 'failed';
case Paused = 'paused';
case WaitingSignal = 'waiting_signal';
case Cancelled = 'cancelled';
}BackoffStrategy
enum BackoffStrategy: string {
case Fixed = 'fixed';
case Linear = 'linear';
case Exponential = 'exponential';
}LogEvent
enum LogEvent: string {
case Started = 'started';
case Completed = 'completed';
case Failed = 'failed';
case Buried = 'buried';
case Retried = 'retried';
case WorkflowStarted = 'workflow_started';
case WorkflowCompleted = 'workflow_completed';
case WorkflowFailed = 'workflow_failed';
case WorkflowPaused = 'workflow_paused';
case WorkflowResumed = 'workflow_resumed';
case WorkflowCancelled = 'workflow_cancelled';
case Debug = 'debug';
}OverlapPolicy
enum OverlapPolicy: string {
case Allow = 'allow';
case Skip = 'skip';
case Buffer = 'buffer';
}See Scheduling for details on how each policy works.
Attributes
#[QueuetyHandler]
PHP 8 attribute for auto-registration. See PHP Attributes.
use Queuety\Attributes\QueuetyHandler;
#[QueuetyHandler( name: 'send_email', queue: 'emails', max_attempts: 5, needs_wordpress: true )]
class SendEmailHandler implements Handler {
// ...
}