clj-r2dbc.impl.execute.stream

Streaming row execution for clj-r2dbc.

Provides:
  stream*  - Missionary flow emitting one value per row as a RowCursor or
             built value (when :builder-fn supplied), or one vector per
             :chunk-size batch (when :chunk-size supplied). Renamed from plan*.

Note: RowCursor is a shared mutable object. The same instance is mutated for
every row emitted by stream*. Retaining a reference to the cursor beyond the
current m/?> boundary silently returns data from a later row - no exception
is thrown. Callers that need to keep row data must either:
  (a) supply :builder-fn (recommended) to materialize an immutable value, or
  (b) call (cursor-row c) and (cursor-cache c) immediately within the same
      reduce step and pass them to impl/sql/row's row->map before the next
      row arrives.

Backpressure architecture:
  stream* delegates row delivery to lifecycle/result-rows-flow, which bridges
  R2DBC's push-based Publisher<Result> into a demand-driven Missionary discrete
  flow via Missionary's reactive-streams adapter (m/subscribe). Reactive demand
  is one row at a time; database round-trip batching is controlled independently
  by Statement.fetchSize (applied in stmt/prepare!).

  Backpressure chain:
    consumer m/reduce -> m/?> result-rows-flow -> m/subscribe(Publisher<Row>)
      -> Subscription.request(1) -> R2DBC driver -> database cursor

  Chunking (:chunk-size) batches at the Reactive Streams layer via
  lifecycle/result-chunks-flow (publisher/buffer-pub) so the consumer performs
  O(batches) transfers; flyweight mode maps each row onto a shared RowCursor.
  No eager collection - rows stream from database to consumer.

This namespace is an implementation detail; do not use from application code.

stream*

(stream* db sql params opts)
Return a Missionary flow that emits one value per row returned by sql.
Renamed from plan*; semantics are identical.

Without :builder-fn - emits a shared mutable RowCursor (flyweight). The same
instance is updated in-place for each row. Do NOT retain references across
m/?> boundaries; silent data corruption otherwise. Materialize data
immediately using cursor-row and cursor-cache within the same reduce step.

With :builder-fn (fn [Row RowMetadata] -> value) - applied per row inside
result-row-pub's BiFunction, emitting an immutable value safe for retention.
Use clj-r2dbc.row/kebab-maps for standard kebab-case row maps.

With :chunk-size - requires :builder-fn; emits one vector of up to chunk-size
built values per emission (the final vector may be shorter). Batching happens
at the Reactive Streams layer (lifecycle/result-chunks-flow via buffer-pub) so
the consumer performs O(batches) transfers.

Args:
  db     - ConnectionFactory, Connection, or ConnectableWithOpts.
  sql    - SQL string.
  params - sequential bind parameters.
  opts   - options map:
             :builder-fn - 2-arity (Row, RowMetadata) -> value; skips RowCursor entirely.
             :chunk-size - rows per emitted vector; requires :builder-fn.
             :qualifier  - column keyword mode used when no :builder-fn is supplied.
             :fetch-size - rows per database round-trip batch (default 128),
                           applied to Statement.fetchSize. Clamped to 1 in
                           flyweight mode (no :builder-fn): the R2DBC SPI permits
                           drivers to recycle Row objects after onNext returns.
             :returning  - calls Statement.returnGeneratedValues.

Connection lifecycle:
  When db is a ConnectionFactory, stream* acquires a connection at flow start
  and closes it on completion, error, or cancellation (via close-conn-flow on
  the flow terminator). When db is already a Connection, lifecycle is owned by
  the caller; Connection.close() is NOT called.

Backpressure:
  Row delivery is demand-driven through Missionary's m/subscribe bridge (one
  row per reactive request). Database fetch batching is governed by
  Statement.fetchSize, independently of reactive demand. No eager collection -
  true end-to-end streaming from database to consumer.

Zero-copy:
  With :builder-fn, row-xf materializes values inside result-row-pub's
  BiFunction.apply() while the underlying ByteBuf is guaranteed live, so
  emitted builder values are safe to hold across deref boundaries.
  In flyweight mode (no :builder-fn), row-xf is identity and fetch-size is
  clamped to 1; each row is materialized into the cursor within the same
  consumer step it is delivered.

stream-dispatch*

(stream-dispatch* db sql opts)
Dispatch stream execution from validated opts.

Resolves the builder mode (:builder-fn present vs. absent) and chunking
(:chunk-size present vs. absent), then delegates to stream*.

Args:
  db   - ConnectionFactory, Connection, or ConnectableWithOpts.
  sql  - non-blank SQL string.
  opts - validated options map from stream-opts.

Returns a Missionary discrete flow emitting one value per row.