clj-r2dbc.impl.execute.stream
Streaming row execution for clj-r2dbc.
Provides:
stream* - Missionary flow emitting one value per row as a RowCursor or
built value (when :builder-fn supplied), or one ArrayList per
fetch-size batch (when :chunk-size supplied). Renamed from plan*.
Note: RowCursor is a shared mutable object. The same instance is mutated for
every row emitted by stream*. Retaining a reference to the cursor beyond the
current m/?> boundary silently returns data from a later row - no exception
is thrown. Callers that need to keep row data must either:
(a) supply :builder-fn (recommended) to materialize an immutable value, or
(b) call (cursor-row c) and (cursor-cache c) immediately within the same
reduce step and pass them to impl/sql/row's row->map before the next
row arrives.
Backpressure architecture:
stream* uses r2dbc-row-flow, a custom Missionary discrete flow that bridges
R2DBC's push-based Publisher<Row> into demand-driven per-row emission.
Rows are buffered in an ArrayDeque of capacity fetch-size. When the buffer
empties, Subscription.request(fetch-size) is issued to pull the next batch.
Backpressure chain:
consumer m/reduce -> m/?> row-flow -> r2dbc-row-flow.deref()
-> Subscription.request(fetch-size) -> R2DBC driver -> database cursor
No m/observe (push-based, no backpressure).
No m/subscribe (Reactor scalar-fusion reentrancy hazard).
No eager collection - rows stream one-by-one from database to consumer.
This namespace is an implementation detail; do not use from application code.stream*
(stream* db sql params opts)
Return a Missionary flow that emits one value per row returned by sql.
Renamed from plan*; semantics are identical.
Without :builder-fn - emits a shared mutable RowCursor (flyweight). The same
instance is updated in-place for each row. Do NOT retain references across
m/?> boundaries; silent data corruption otherwise. Materialize data
immediately using cursor-row and cursor-cache within the same reduce step.
With :builder-fn (fn [Row RowMetadata] -> value) - applied per row inside the
fetch loop, emitting an immutable value safe for retention. Use
clj-r2dbc.row/kebab-maps for standard kebab-case row maps.
Args:
db - ConnectionFactory, Connection, or ConnectableWithOpts.
sql - SQL string.
params - sequential bind parameters.
opts - options map:
:builder-fn - 2-arity (Row, RowMetadata) -> value; skips RowCursor entirely.
:qualifier - column keyword mode used when no :builder-fn is supplied.
:fetch-size - rows per demand-driven batch (default 128).
:returning - calls Statement.returnGeneratedValues.
Connection lifecycle:
When db is a ConnectionFactory, stream* acquires a connection at flow start
and closes it via try/finally on completion, error, or cancellation.
When db is already a Connection, lifecycle is owned by the caller;
Connection.close() is NOT called.
Backpressure:
Both paths use r2dbc-row-flow, a custom Missionary discrete flow that
bridges Publisher<Row> with demand-driven fetch-size batching. Each
deref() returns one transformed row. When the internal buffer empties,
Subscription.request(fetch-size) pulls the next batch from the driver.
No eager collection - true end-to-end streaming from database to consumer.
Zero-copy:
Row data is captured in onNext while Row is valid. Values stored in the
internal buffer are fully captured before the driver recycles the Row
object - safe to hold across deref boundaries.stream-dispatch*
(stream-dispatch* db sql opts)
Dispatch stream execution from validated opts.
Resolves the builder mode (:builder-fn present vs. absent) and chunking
(:chunk-size present vs. absent), then delegates to stream*.
Args:
db - ConnectionFactory, Connection, or ConnectableWithOpts.
sql - non-blank SQL string.
opts - validated options map from stream-opts.
Returns a Missionary discrete flow emitting one value per row.