Middleware
Middleware wraps job execution in an onion-style pipeline. Each middleware can run logic before and after the job, short-circuit execution, or catch exceptions. Queuety ships four built-in middleware and supports custom middleware through the Contracts\Middleware interface.
Middleware is available for dispatchable job classes (classes implementing Contracts\Job with the Dispatchable trait).
The Contracts\Middleware interface
use Queuety\Contracts\Middleware;
interface Middleware {
public function handle( object $job, \Closure $next ): void;
}The $job parameter is the job instance being processed. Call $next( $job ) to pass execution to the next middleware in the pipeline (or to the job's handle() method if this is the last middleware).
Adding middleware to a job
Define a middleware() method on your job class that returns an array of middleware instances:
use Queuety\Contracts\Job;
use Queuety\Dispatchable;
use Queuety\Middleware\RateLimited;
use Queuety\Middleware\Timeout;
class SyncInventoryJob implements Job {
use Dispatchable;
public function __construct(
public readonly string $sku,
) {}
public function handle(): void {
// Sync inventory for this SKU...
}
public function middleware(): array {
return [
new Timeout( seconds: 30 ),
new RateLimited( max: 100, window: 60 ),
];
}
}Pipeline execution order
Middleware are applied in array order, with the first middleware being the outermost layer. If your middleware() method returns [A, B, C], the execution order is:
A::handle() -> B::handle() -> C::handle() -> $job->handle()Each middleware wraps the next. A runs first before the job and last after the job, making it ideal for timing or try/catch wrappers.
Writing custom middleware
Implement the Contracts\Middleware interface:
use Queuety\Contracts\Middleware;
class LogExecution implements Middleware {
public function handle( object $job, \Closure $next ): void {
$class = get_class( $job );
error_log( "[Queuety] Starting: {$class}" );
$start = microtime( true );
try {
$next( $job );
} finally {
$elapsed = round( microtime( true ) - $start, 3 );
error_log( "[Queuety] Finished: {$class} in {$elapsed}s" );
}
}
}Use it on a job:
public function middleware(): array {
return [
new LogExecution(),
];
}Short-circuiting
A middleware can skip the job entirely by not calling $next():
class SkipOnMaintenanceMode implements Middleware {
public function handle( object $job, \Closure $next ): void {
if ( wp_is_maintenance_mode() ) {
return; // Job silently skipped.
}
$next( $job );
}
}Built-in middleware
ThrottlesExceptions
Throttles exceptions from a job handler to prevent job storms. If a handler has thrown too many exceptions within a time window, subsequent jobs for that handler are rejected with a RuntimeException instead of being processed. This prevents a failing external service from burning through all retry attempts across many jobs at once.
use Queuety\Middleware\ThrottlesExceptions;
public function middleware(): array {
return [
new ThrottlesExceptions( max_attempts: 10, decay_minutes: 10 ),
];
}| Parameter | Type | Default | Description |
|---|---|---|---|
$max_attempts | int | 10 | Maximum exceptions allowed within the decay window |
$decay_minutes | int | 10 | Window duration in minutes |
Exception counts are tracked per handler class in Queuety's locks table and shared across all worker processes. When the threshold is reached, the middleware throws a RuntimeException and the job is retried after a backoff delay.
Example: if an external API is down, the first 10 failures are processed normally (each incrementing the exception counter). The 11th and subsequent attempts within 10 minutes are throttled, preserving retry attempts until the API recovers.
RateLimited
Enforces a maximum number of executions within a sliding time window. If the limit is exceeded, a RateLimitExceededException is thrown and the job is retried after the window resets.
use Queuety\Middleware\RateLimited;
public function middleware(): array {
return [
new RateLimited( max: 60, window: 60 ), // 60 per minute
];
}| Parameter | Type | Description |
|---|---|---|
$max | int | Maximum executions allowed in the window |
$window | int | Window duration in seconds |
The rate limiter uses Queuety's internal RateLimiter instance, so limits are enforced across all worker processes.
Timeout
Enforces a per-job execution timeout using pcntl_alarm. If the job exceeds the timeout, a TimeoutException is thrown and the job is retried.
use Queuety\Middleware\Timeout;
public function middleware(): array {
return [
new Timeout( seconds: 30 ),
];
}| Parameter | Type | Description |
|---|---|---|
$seconds | int | Maximum execution time in seconds |
Requires the pcntl PHP extension. If pcntl is not available, the middleware falls through silently (no timeout is enforced).
UniqueJob
Prevents concurrent execution of jobs with the same key. Uses a database lock table to ensure only one instance runs at a time. If a lock is already held, the job is silently skipped (not retried).
use Queuety\Middleware\UniqueJob;
public function middleware(): array {
return [
new UniqueJob(), // Lock key defaults to the job class name
new UniqueJob( key: 'sync_inventory' ), // Custom lock key
];
}| Parameter | Type | Default | Description |
|---|---|---|---|
$key | string|null | null (uses class name) | Lock key for uniqueness |
WithoutOverlapping
Prevents overlapping execution of jobs sharing a logical key, with an automatic lock expiry to prevent dead locks from crashed workers. Similar to UniqueJob but designed for long-running operations where the lock should auto-release after a timeout.
use Queuety\Middleware\WithoutOverlapping;
public function middleware(): array {
return [
new WithoutOverlapping( key: 'import_products', release_after: 600 ),
];
}| Parameter | Type | Default | Description |
|---|---|---|---|
$key | string | (required) | Logical operation key |
$release_after | int | 300 | Maximum lock duration in seconds before auto-release |
If a lock is already held, the job is silently skipped. Expired locks are cleaned up automatically before each acquisition attempt.
Example: combining middleware
use Queuety\Contracts\Job;
use Queuety\Dispatchable;
use Queuety\Middleware\RateLimited;
use Queuety\Middleware\Timeout;
use Queuety\Middleware\WithoutOverlapping;
class ImportProductsJob implements Job {
use Dispatchable;
public function __construct(
public readonly int $supplier_id,
) {}
public function handle(): void {
// Fetch and import products from the supplier...
}
public function middleware(): array {
return [
new Timeout( seconds: 300 ),
new WithoutOverlapping(
key: "import_supplier_{$this->supplier_id}",
release_after: 600,
),
new RateLimited( max: 5, window: 60 ),
];
}
}This job will:
- Be killed if it runs longer than 5 minutes (Timeout)
- Be skipped if another import for the same supplier is already running (WithoutOverlapping)
- Be rate limited to 5 imports per minute across all suppliers (RateLimited)