clj-r2dbc.impl.connection.publisher
Reactive Streams publisher bridges for clj-r2dbc.
org.reactivestreams interop for clj-r2dbc. Row delivery to Missionary flows is
handled by Missionary's reactive-streams adapter (m/subscribe, see
connection/lifecycle); this namespace provides the Publisher constructors and
the blocking/async bridges used around it.
Provides:
result-row-pub - Publisher<Row> for all rows in one R2DBC Result.
async-subscribe-pub - Publisher wrapper deferring .subscribe to m/blk.
buffer-pub - Publisher wrapper batching items into size-N vectors.
add-demand - saturating addition for demand accounting.
first-pub-blocking - blocking first-value extraction from a Publisher.
await-void-pub! - blocking drain-and-discard for void Publishers.
await-future - blocking CompletableFuture unwrap.
This namespace is an implementation detail; do not use from application code.
add-demand
(add-demand current delta)
Add delta to current demand with saturation at Long/MAX_VALUE.
Used by buffer-pub to accumulate batch demand without overflow.
Args:
current - long, existing demand.
delta - long, demand increment.
Returns a long in [current, Long/MAX_VALUE].
async-subscribe-pub
(async-subscribe-pub pub)
Wrap pub so that .subscribe is dispatched on the Missionary blocking
executor (m/blk) rather than the caller's thread.
This interposes an async boundary on subscription so a synchronous publisher
(e.g. Reactor scalar fusion, or H2 in-memory emitting during subscribe)
cannot invoke a downstream Subscriber callback re-entrantly while a Missionary
flow is still being constructed. Keeps driver-side subscribe work off the
ForkJoinPool consumer threads.
If pub's .subscribe throws synchronously on the m/blk thread, the error is
forwarded to the Subscriber's onError; otherwise it would be swallowed by the
discarded CompletableFuture, leaving a downstream m/subscribe parked forever.
Args:
pub - Publisher to subscribe to asynchronously.
Returns a Publisher that defers pub's subscription to m/blk.
await-future
(await-future fut)
Block the calling thread until fut completes and returns its result.
Used inside Reactive Streams callbacks where parking is not available.
Propagates CancellationException and ExecutionException unwrapped.
Args:
fut - CompletableFuture<T>.
Returns the future's result value.
await-void-pub!
(await-void-pub! pub)
Subscribe to a Publisher<Void> and block until it completes.
Used for fire-and-forget publishers such as Statement.execute() void
results. Production code paths prefer util/void->task (returns a
Missionary task; awaited with m/? inside m/sp or m/ap, so the park
happens via the Missionary scheduler rather than the calling thread).
Kept here for REPL examples.
Args:
pub - Publisher<Void> to drain.
buffer-pub
(buffer-pub upstream chunk-size)
Wrap upstream so emitted items are batched into vectors of up to chunk-size.
Returns a Publisher<clojure.lang.IPersistentVector> that, on each downstream
request, pulls chunk-size items from upstream and emits them as one vector;
the final vector may be shorter. This moves batching to the Reactive Streams
layer so a downstream m/subscribe performs O(batches) transfers rather than
O(rows), restoring the per-batch scheduling win without a hand-rolled flow.
Demand-driven and RS-compliant for a conformant upstream (delivers at most the
requested count): each downstream request(d) collects d batches, requesting
chunk-size upstream per batch. onComplete flushes any partial final batch.
Errors and cancellation propagate to/from upstream. Terminal signals fire once:
the lock-guarded done flag also elects the single thread that may signal
downstream termination, so no separate atomic is needed. All buf access is
serialized by the state monitor, which lets a transient vector accumulate
batches without the O(chunk-size) copy a mutable list + vec would cost.
first-pub-blocking
(first-pub-blocking pub sub-ref)
Subscribe to pub, request one item, and return it synchronously.
Blocks the calling thread. Production code paths prefer
conn/acquire-connection (a Missionary task that parks on m/blk rather
than the FJP, avoiding pool starvation under concurrent stream starts).
Kept here for the REPL examples and any future callers that already
hold a non-FJP thread.
Args:
pub - Publisher<T> to subscribe to.
sub-ref - volatile that receives the Subscription for cancellation.
result-row-pub
(result-row-pub result row-xf)
Return a Publisher<T> for all rows in result, each transformed by row-xf.
row-xf is applied inside BiFunction.apply() - before Result.map's finally
block calls message.release() on the underlying ByteBuf. This guarantees the
Row's backing ByteBuf is live when row-xf (e.g., builder-fn) reads column
values via Row.get(), regardless of whether the downstream Flux buffers the
row internally (demand == 0) before delivering it.
Applying row-xf here rather than in the downstream subscriber's onNext is the
only safe point: once sink.next() queues a row into Reactor's internal buffer
(because demand is zero at that instant) and the enclosing handler returns,
message.release() fires and the ByteBuf is returned to the Netty pool. Any
subsequent Row.get() call on the buffered row then reads freed memory,
causing IllegalReferenceCountException that escapes the subscriber's onNext
without calling notifier - leaving the Missionary reduce waiting forever.
Args:
result - R2DBC Result to map over.
row-xf - 1-arity fn [Row] -> T applied to each row inside BiFunction.apply().