clj-r2dbc.impl.execute.stream
Streaming row execution for clj-r2dbc.
Provides:
stream* - Missionary flow emitting one value per row as a RowCursor or
built value (when :builder-fn supplied), or one vector per
:chunk-size batch (when :chunk-size supplied). Renamed from plan*.
Note: RowCursor is a shared mutable object. The same instance is mutated for
every row emitted by stream*. Retaining a reference to the cursor beyond the
current m/?> boundary silently returns data from a later row - no exception
is thrown. Callers that need to keep row data must either:
(a) supply :builder-fn (recommended) to materialize an immutable value, or
(b) call (cursor-row c) and (cursor-cache c) immediately within the same
reduce step and pass them to impl/sql/row's row->map before the next
row arrives.
Backpressure architecture:
stream* delegates row delivery to lifecycle/result-rows-flow, which bridges
R2DBC's push-based Publisher<Result> into a demand-driven Missionary discrete
flow via Missionary's reactive-streams adapter (m/subscribe). Reactive demand
is one row at a time; database round-trip batching is controlled independently
by Statement.fetchSize (applied in stmt/prepare!).
Backpressure chain:
consumer m/reduce -> m/?> result-rows-flow -> m/subscribe(Publisher<Row>)
-> Subscription.request(1) -> R2DBC driver -> database cursor
Chunking (:chunk-size) batches at the Reactive Streams layer via
lifecycle/result-chunks-flow (publisher/buffer-pub) so the consumer performs
O(batches) transfers; flyweight mode maps each row onto a shared RowCursor.
No eager collection - rows stream from database to consumer.
This namespace is an implementation detail; do not use from application code.stream*
(stream* db sql params opts)
Return a Missionary flow that emits one value per row returned by sql.
Renamed from plan*; semantics are identical.
Without :builder-fn - emits a shared mutable RowCursor (flyweight). The same
instance is updated in-place for each row. Do NOT retain references across
m/?> boundaries; silent data corruption otherwise. Materialize data
immediately using cursor-row and cursor-cache within the same reduce step.
With :builder-fn (fn [Row RowMetadata] -> value) - applied per row inside
result-row-pub's BiFunction, emitting an immutable value safe for retention.
Use clj-r2dbc.row/kebab-maps for standard kebab-case row maps.
With :chunk-size - requires :builder-fn; emits one vector of up to chunk-size
built values per emission (the final vector may be shorter). Batching happens
at the Reactive Streams layer (lifecycle/result-chunks-flow via buffer-pub) so
the consumer performs O(batches) transfers.
Args:
db - ConnectionFactory, Connection, or ConnectableWithOpts.
sql - SQL string.
params - sequential bind parameters.
opts - options map:
:builder-fn - 2-arity (Row, RowMetadata) -> value; skips RowCursor entirely.
:chunk-size - rows per emitted vector; requires :builder-fn.
:qualifier - column keyword mode used when no :builder-fn is supplied.
:fetch-size - rows per database round-trip batch (default 128),
applied to Statement.fetchSize. Clamped to 1 in
flyweight mode (no :builder-fn): the R2DBC SPI permits
drivers to recycle Row objects after onNext returns.
:returning - calls Statement.returnGeneratedValues.
Connection lifecycle:
When db is a ConnectionFactory, stream* acquires a connection at flow start
and closes it on completion, error, or cancellation (via close-conn-flow on
the flow terminator). When db is already a Connection, lifecycle is owned by
the caller; Connection.close() is NOT called.
Backpressure:
Row delivery is demand-driven through Missionary's m/subscribe bridge (one
row per reactive request). Database fetch batching is governed by
Statement.fetchSize, independently of reactive demand. No eager collection -
true end-to-end streaming from database to consumer.
Zero-copy:
With :builder-fn, row-xf materializes values inside result-row-pub's
BiFunction.apply() while the underlying ByteBuf is guaranteed live, so
emitted builder values are safe to hold across deref boundaries.
In flyweight mode (no :builder-fn), row-xf is identity and fetch-size is
clamped to 1; each row is materialized into the cursor within the same
consumer step it is delivered.stream-dispatch*
(stream-dispatch* db sql opts)
Dispatch stream execution from validated opts.
Resolves the builder mode (:builder-fn present vs. absent) and chunking
(:chunk-size present vs. absent), then delegates to stream*.
Args:
db - ConnectionFactory, Connection, or ConnectableWithOpts.
sql - non-blank SQL string.
opts - validated options map from stream-opts.
Returns a Missionary discrete flow emitting one value per row.