Queuety
Features

Middleware

Middleware wraps job execution in an onion-style pipeline. Each middleware can run logic before and after the job, short-circuit execution, or catch exceptions. Queuety ships four built-in middleware and supports custom middleware through the Contracts\Middleware interface.

Middleware is available for dispatchable job classes (classes implementing Contracts\Job with the Dispatchable trait).

The Contracts\Middleware interface

use Queuety\Contracts\Middleware;

interface Middleware {
    public function handle( object $job, \Closure $next ): void;
}

The $job parameter is the job instance being processed. Call $next( $job ) to pass execution to the next middleware in the pipeline (or to the job's handle() method if this is the last middleware).

Adding middleware to a job

Define a middleware() method on your job class that returns an array of middleware instances:

use Queuety\Contracts\Job;
use Queuety\Dispatchable;
use Queuety\Middleware\RateLimited;
use Queuety\Middleware\Timeout;

class SyncInventoryJob implements Job {
    use Dispatchable;

    public function __construct(
        public readonly string $sku,
    ) {}

    public function handle(): void {
        // Sync inventory for this SKU...
    }

    public function middleware(): array {
        return [
            new Timeout( seconds: 30 ),
            new RateLimited( max: 100, window: 60 ),
        ];
    }
}

Pipeline execution order

Middleware are applied in array order, with the first middleware being the outermost layer. If your middleware() method returns [A, B, C], the execution order is:

A::handle() -> B::handle() -> C::handle() -> $job->handle()

Each middleware wraps the next. A runs first before the job and last after the job, making it ideal for timing or try/catch wrappers.

Writing custom middleware

Implement the Contracts\Middleware interface:

use Queuety\Contracts\Middleware;

class LogExecution implements Middleware {
    public function handle( object $job, \Closure $next ): void {
        $class = get_class( $job );
        error_log( "[Queuety] Starting: {$class}" );

        $start = microtime( true );

        try {
            $next( $job );
        } finally {
            $elapsed = round( microtime( true ) - $start, 3 );
            error_log( "[Queuety] Finished: {$class} in {$elapsed}s" );
        }
    }
}

Use it on a job:

public function middleware(): array {
    return [
        new LogExecution(),
    ];
}

Short-circuiting

A middleware can skip the job entirely by not calling $next():

class SkipOnMaintenanceMode implements Middleware {
    public function handle( object $job, \Closure $next ): void {
        if ( wp_is_maintenance_mode() ) {
            return; // Job silently skipped.
        }

        $next( $job );
    }
}

Built-in middleware

ThrottlesExceptions

Throttles exceptions from a job handler to prevent job storms. If a handler has thrown too many exceptions within a time window, subsequent jobs for that handler are rejected with a RuntimeException instead of being processed. This prevents a failing external service from burning through all retry attempts across many jobs at once.

use Queuety\Middleware\ThrottlesExceptions;

public function middleware(): array {
    return [
        new ThrottlesExceptions( max_attempts: 10, decay_minutes: 10 ),
    ];
}
ParameterTypeDefaultDescription
$max_attemptsint10Maximum exceptions allowed within the decay window
$decay_minutesint10Window duration in minutes

Exception counts are tracked per handler class in Queuety's locks table and shared across all worker processes. When the threshold is reached, the middleware throws a RuntimeException and the job is retried after a backoff delay.

Example: if an external API is down, the first 10 failures are processed normally (each incrementing the exception counter). The 11th and subsequent attempts within 10 minutes are throttled, preserving retry attempts until the API recovers.

RateLimited

Enforces a maximum number of executions within a sliding time window. If the limit is exceeded, a RateLimitExceededException is thrown and the job is retried after the window resets.

use Queuety\Middleware\RateLimited;

public function middleware(): array {
    return [
        new RateLimited( max: 60, window: 60 ), // 60 per minute
    ];
}
ParameterTypeDescription
$maxintMaximum executions allowed in the window
$windowintWindow duration in seconds

The rate limiter uses Queuety's internal RateLimiter instance, so limits are enforced across all worker processes.

Timeout

Enforces a per-job execution timeout using pcntl_alarm. If the job exceeds the timeout, a TimeoutException is thrown and the job is retried.

use Queuety\Middleware\Timeout;

public function middleware(): array {
    return [
        new Timeout( seconds: 30 ),
    ];
}
ParameterTypeDescription
$secondsintMaximum execution time in seconds

Requires the pcntl PHP extension. If pcntl is not available, the middleware falls through silently (no timeout is enforced).

UniqueJob

Prevents concurrent execution of jobs with the same key. Uses a database lock table to ensure only one instance runs at a time. If a lock is already held, the job is silently skipped (not retried).

use Queuety\Middleware\UniqueJob;

public function middleware(): array {
    return [
        new UniqueJob(),                         // Lock key defaults to the job class name
        new UniqueJob( key: 'sync_inventory' ),   // Custom lock key
    ];
}
ParameterTypeDefaultDescription
$keystring|nullnull (uses class name)Lock key for uniqueness

WithoutOverlapping

Prevents overlapping execution of jobs sharing a logical key, with an automatic lock expiry to prevent dead locks from crashed workers. Similar to UniqueJob but designed for long-running operations where the lock should auto-release after a timeout.

use Queuety\Middleware\WithoutOverlapping;

public function middleware(): array {
    return [
        new WithoutOverlapping( key: 'import_products', release_after: 600 ),
    ];
}
ParameterTypeDefaultDescription
$keystring(required)Logical operation key
$release_afterint300Maximum lock duration in seconds before auto-release

If a lock is already held, the job is silently skipped. Expired locks are cleaned up automatically before each acquisition attempt.

Example: combining middleware

use Queuety\Contracts\Job;
use Queuety\Dispatchable;
use Queuety\Middleware\RateLimited;
use Queuety\Middleware\Timeout;
use Queuety\Middleware\WithoutOverlapping;

class ImportProductsJob implements Job {
    use Dispatchable;

    public function __construct(
        public readonly int $supplier_id,
    ) {}

    public function handle(): void {
        // Fetch and import products from the supplier...
    }

    public function middleware(): array {
        return [
            new Timeout( seconds: 300 ),
            new WithoutOverlapping(
                key: "import_supplier_{$this->supplier_id}",
                release_after: 600,
            ),
            new RateLimited( max: 5, window: 60 ),
        ];
    }
}

This job will:

  1. Be killed if it runs longer than 5 minutes (Timeout)
  2. Be skipped if another import for the same supplier is already running (WithoutOverlapping)
  3. Be rate limited to 5 imports per minute across all suppliers (RateLimited)

On this page