Dynamic For-Each
Use ->for_each() when the workflow does not know its branch count at build time. Queuety reads an array from workflow state, dispatches one branch job per item, and advances only when the configured completion condition is satisfied.
The ->for_each() method
use Queuety\Contracts\ForEachHandler;
use Queuety\Contracts\ForEachReducer;
use Queuety\Enums\ForEachMode;
use Queuety\Queuety;
Queuety::workflow( 'index_site' )
->then( PlanPagesStep::class )
->for_each(
items_key: 'pages',
handler_class: IndexPageStep::class,
result_key: 'page_results',
mode: ForEachMode::Quorum,
quorum: 2,
reducer_class: IndexSummaryReducer::class,
name: 'index_pages',
)
->then( PublishIndexStep::class )
->dispatch( [ 'site_id' => 42 ] );The planner step writes pages into workflow state. The for-each step reads that array and dispatches one branch job per item.
Branch handlers
For-each branch handlers implement Contracts\ForEachHandler:
use Queuety\Contracts\ForEachHandler;
final class IndexPageStep implements ForEachHandler {
public function handle_item( array $state, mixed $item, int $index ): array {
$page = is_array( $item ) ? $item : [ 'url' => (string) $item ];
return [
'url' => $page['url'],
'title' => fetch_title( $page['url'] ),
'index' => $index,
];
}
public function config(): array {
return [
'max_attempts' => 3,
'backoff' => [5, 30, 120],
];
}
}$state contains the full workflow state, $item is the individual for-each item, and $index is its zero-based position.
Completion modes
ForEachMode::All
The step only settles after every branch succeeds. Any permanent branch failure makes the for-each step fail.
ForEachMode::FirstSuccess
The step settles as soon as the first branch succeeds. Remaining active branches are buried as superseded.
ForEachMode::Quorum
The step settles when at least quorum branches succeed. If the remaining branches can no longer satisfy the quorum, the workflow fails.
Reducers
By default Queuety stores a structured aggregate under result_key:
modequorumtotalsucceededfailedwinnerresultsfailures
If you want a custom summary, pass a reducer class implementing Contracts\ForEachReducer:
use Queuety\Contracts\ForEachReducer;
final class IndexSummaryReducer implements ForEachReducer {
public function reduce( array $state, array $for_each ): array {
return [
'indexed_count' => count( $for_each['results'] ),
'winning_url' => $for_each['winner']['output']['url'] ?? null,
];
}
}Reducer output is merged into workflow state like any other step output. It may also return _next_step to branch after the for-each settles.
Retry behavior
For-each branches retry independently according to the branch handler's config(). If the for-each step fails and you call retry_workflow(), Queuety re-expands the for-each step and only requeues branches that are still missing or failed. Successful branch results stay in workflow state.
Choosing between parallel() and for_each()
Use parallel() when the branch list is fixed in code.
Use for_each() when the branch list is discovered at runtime from workflow state and you need completion modes like first-success or quorum.