Heartbeats
Long-running jobs and streaming steps can send heartbeats to prove they are still alive. Without heartbeats, the stale job detector may reclaim a job that is still actively processing because its reserved_at timestamp has not been updated.
How it works
When a worker starts processing a job, it calls Heartbeat::init() to set the current job context. Inside your handler, calling Heartbeat::beat() updates the job's reserved_at column to NOW(). The stale detector uses reserved_at to determine whether a job is stuck, so refreshing it keeps the job safe.
When the job finishes, the worker calls Heartbeat::clear() to reset the context.
Sending heartbeats
Call Heartbeat::beat() from inside any job handler or streaming step:
use Queuety\Heartbeat;
class LongRunningStep implements \Queuety\Contracts\Step {
public function handle( array $state ): array {
$items = $state['items'];
foreach ( $items as $i => $item ) {
$this->process( $item );
Heartbeat::beat( [ 'processed' => $i + 1, 'total' => count( $items ) ] );
}
return [ 'done' => true ];
}
public function config(): array {
return [];
}
private function process( $item ): void {
// Expensive work here.
}
}Progress data
The $progress parameter is an optional associative array. When provided, it is JSON-encoded and stored in the heartbeat_data column on the job row. This lets you track progress from outside the worker:
Heartbeat::beat( [
'processed' => 450,
'total' => 1000,
'percent' => 45,
] );When called without arguments, only reserved_at is updated:
Heartbeat::beat();Heartbeat API
The Queuety\Heartbeat class is entirely static. You do not instantiate it.
| Method | Description |
|---|---|
Heartbeat::init( int $job_id, Connection $conn ) | Set the job context. Called automatically by the worker before processing. |
Heartbeat::beat( array $progress = [] ) | Send a heartbeat. Updates reserved_at and optionally stores progress data. |
Heartbeat::clear() | Clear the job context. Called automatically by the worker after processing. |
Heartbeat::current_job_id() | Get the current job ID (useful for testing). |
Usage in streaming steps
Streaming steps should send heartbeats inside the generator loop. Each chunk yield already triggers a DB write, but the chunk INSERT does not update reserved_at. Add a Heartbeat::beat() call alongside your yields:
use Queuety\Contracts\StreamingStep;
use Queuety\Heartbeat;
class StreamAPIResponse implements StreamingStep {
public function stream( array $state, array $existing_chunks = [] ): \Generator {
$response = $this->client->stream( $state['prompt'] );
foreach ( $response as $i => $chunk ) {
Heartbeat::beat( [ 'chunks' => $i + 1 ] );
yield $chunk;
}
}
public function on_complete( array $chunks, array $state ): array {
return [ 'response' => implode( '', $chunks ) ];
}
public function config(): array {
return [];
}
}When to use heartbeats
Use heartbeats when a single job or step runs for longer than QUEUETY_STALE_TIMEOUT seconds (default: 300). Common cases:
- LLM API calls with streaming responses
- Large file downloads or uploads
- Batch processing loops inside a single step
- Any external API call that may take minutes to respond
If your step completes in under a minute, heartbeats are unnecessary. The stale detector only kicks in after the configured timeout has elapsed.