clj-r2dbc.impl.connection.lifecycle

Connection lifecycle management for streaming plan flows.

When db is a ConnectionFactory: connection is acquired on flow start and
closed on termination (normal/error/cancel).
When db is already a Connection: the caller owns the lifecycle;
Connection.close() is NOT called.

This namespace owns stream connection lifecycle transitions and the
Publisher<Result> -> row flow bridge (result-rows-flow), built on Missionary's
reactive-streams adapter (m/subscribe). Chunking and flyweight transforms are
applied by the caller (impl/execute/stream) via m/eduction.

This namespace is an implementation detail; do not use from application code.

result-chunks-flow

(result-chunks-flow result-pub row-xf chunk-size)
Like result-rows-flow, but emits one vector of up to chunk-size transformed
rows per value instead of one row.

Batching happens at the Reactive Streams layer via pub/buffer-pub, so the
downstream m/subscribe performs O(batches) transfers rather than O(rows) -
recovering the per-batch scheduling win. Batches are per-Result: a partial
final batch is emitted at each Result boundary (identical to per-stream
batching for the single-Result common case).

Args:
  result-pub - Publisher<Result> from Statement.execute().
  row-xf     - 1-arity fn [Row] -> value applied to each row.
  chunk-size - positive number of rows per emitted vector.

result-rows-flow

(result-rows-flow result-pub row-xf)
Return a Missionary discrete flow emitting transformed rows from every Result
produced by result-pub, flattened in order.

Delegates row delivery to Missionary's reactive-streams bridge (m/subscribe):
the outer Publisher<Result> is consumed one Result at a time (m/?> defaults to
parallelism 1), and each Result's Publisher<Row> (from result-row-pub) is
consumed one row at a time. Demand-driven backpressure, cancellation, and
termination are handled by m/subscribe. Database round-trip batching is
governed independently by Statement.fetchSize (see statement/apply-opts!), so
one-row-at-a-time reactive demand does not change network fetch granularity.

Subscriptions are dispatched on m/blk (pub/async-subscribe-pub) to keep driver
callbacks off the consumer's ForkJoinPool threads and to avoid synchronous
re-entrancy during flow construction.

row-xf is applied inside result-row-pub's BiFunction while the Row's backing
ByteBuf is guaranteed live; see publisher/result-row-pub for the safety
argument.

Args:
  result-pub - Publisher<Result> from Statement.execute().
  row-xf     - 1-arity fn [Row] -> value applied to each row.

streaming-plan-flow

(streaming-plan-flow db sql params opts row-xf)
Return a Missionary discrete flow emitting transformed rows with end-to-end
demand-driven backpressure.

Connection lifecycle: when db is a ConnectionFactory, the connection is
acquired at flow start (via conn/acquire-connection, which subscribes
non-blockingly and parks on the m/blk executor, not ForkJoinPool) and
closed on completion/error/cancellation. When db is already a Connection,
the caller owns the lifecycle.

Structure: an m/ap whose let-bindings (connection acquire, statement prep,
row flow construction) evaluate once before the first m/?> emission; the inner
flow is wrapped by close-conn-flow when owned, so Connection.close() runs
exactly once on the inner flow's terminator (the wrapper's AtomicBoolean-gated
terminator). Cleanup rides the terminator rather than a try/finally inside
m/ap, whose finally would re-run per emission.

The inner flow is result-rows-flow (one row per value) or, when opts carries
:chunk-size, result-chunks-flow (one vector of up to :chunk-size rows per
value). Both bridge the Publisher<Result> via m/subscribe. Statement.fetchSize
(applied from opts in stmt/prepare!) controls database fetch batching.

Args:
  db     - ConnectionFactory or Connection (ConnectableWithOpts must be
           unwrapped by the caller via conn/resolve-connectable).
  sql    - SQL string to execute.
  params - sequential collection of bind parameters.
  opts   - options map (see execute for supported keys); :chunk-size selects
           chunked emission.
  row-xf - 1-arity fn applied to each raw R2DBC Row to produce the emitted
           value (identity in flyweight mode).