clj-r2dbc.impl.execute.stream

Streaming row execution for clj-r2dbc.

Provides:
  stream*  - Missionary flow emitting one value per row as a RowCursor or
             built value (when :builder-fn supplied), or one ArrayList per
             fetch-size batch (when :chunk-size supplied). Renamed from plan*.

Note: RowCursor is a shared mutable object. The same instance is mutated for
every row emitted by stream*. Retaining a reference to the cursor beyond the
current m/?> boundary silently returns data from a later row - no exception
is thrown. Callers that need to keep row data must either:
  (a) supply :builder-fn (recommended) to materialize an immutable value, or
  (b) call (cursor-row c) and (cursor-cache c) immediately within the same
      reduce step and pass them to impl/sql/row's row->map before the next
      row arrives.

Backpressure architecture:
  stream* uses r2dbc-row-flow, a custom Missionary discrete flow that bridges
  R2DBC's push-based Publisher<Row> into demand-driven per-row emission.
  Rows are buffered in an ArrayDeque of capacity fetch-size. When the buffer
  empties, Subscription.request(fetch-size) is issued to pull the next batch.

  Backpressure chain:
    consumer m/reduce -> m/?> row-flow -> r2dbc-row-flow.deref()
      -> Subscription.request(fetch-size) -> R2DBC driver -> database cursor

  No m/observe (push-based, no backpressure).
  No m/subscribe (Reactor scalar-fusion reentrancy hazard).
  No eager collection - rows stream one-by-one from database to consumer.

This namespace is an implementation detail; do not use from application code.

stream*

(stream* db sql params opts)
Return a Missionary flow that emits one value per row returned by sql.
Renamed from plan*; semantics are identical.

Without :builder-fn - emits a shared mutable RowCursor (flyweight). The same
instance is updated in-place for each row. Do NOT retain references across
m/?> boundaries; silent data corruption otherwise. Materialize data
immediately using cursor-row and cursor-cache within the same reduce step.

With :builder-fn (fn [Row RowMetadata] -> value) - applied per row inside the
fetch loop, emitting an immutable value safe for retention. Use
clj-r2dbc.row/kebab-maps for standard kebab-case row maps.

Args:
  db     - ConnectionFactory, Connection, or ConnectableWithOpts.
  sql    - SQL string.
  params - sequential bind parameters.
  opts   - options map:
             :builder-fn - 2-arity (Row, RowMetadata) -> value; skips RowCursor entirely.
             :qualifier  - column keyword mode used when no :builder-fn is supplied.
             :fetch-size - rows per demand-driven batch (default 128).
             :returning  - calls Statement.returnGeneratedValues.

Connection lifecycle:
  When db is a ConnectionFactory, stream* acquires a connection at flow start
  and closes it via try/finally on completion, error, or cancellation.
  When db is already a Connection, lifecycle is owned by the caller;
  Connection.close() is NOT called.

Backpressure:
  Both paths use r2dbc-row-flow, a custom Missionary discrete flow that
  bridges Publisher<Row> with demand-driven fetch-size batching. Each
  deref() returns one transformed row. When the internal buffer empties,
  Subscription.request(fetch-size) pulls the next batch from the driver.
  No eager collection - true end-to-end streaming from database to consumer.

Zero-copy:
  Row data is captured in onNext while Row is valid. Values stored in the
  internal buffer are fully captured before the driver recycles the Row
  object - safe to hold across deref boundaries.

stream-dispatch*

(stream-dispatch* db sql opts)
Dispatch stream execution from validated opts.

Resolves the builder mode (:builder-fn present vs. absent) and chunking
(:chunk-size present vs. absent), then delegates to stream*.

Args:
  db   - ConnectionFactory, Connection, or ConnectableWithOpts.
  sql  - non-blank SQL string.
  opts - validated options map from stream-opts.

Returns a Missionary discrete flow emitting one value per row.