clj-r2dbc.impl.connection.publisher
Reactive Streams publisher bridges for clj-r2dbc.
Pure org.reactivestreams interop - zero Missionary dependency in this namespace.
Three utility categories:
Fetch-size resolution: resolve-fetch-size extracts a safe positive long
from caller opts.
Demand arithmetic: add-demand performs saturating addition to accumulate
backpressure demand without overflow.
Future bridging: await-future, first-pub-blocking, and await-void-pub!
block the calling thread to synchronize with Reactive Streams callbacks
where Missionary parking is unavailable.
Provides:
result-row-pub - Publisher<Row> for all rows in one R2DBC Result.
result-rows-pub - Publisher<Row> streaming rows from multiple Results sequentially.
first-pub-blocking - blocking first-value extraction from a Publisher.
await-void-pub! - blocking drain-and-discard for void Publishers.
await-future - blocking CompletableFuture unwrap.
request-async! - async Subscription.request call via ForkJoinPool.
resolve-fetch-size - safe fetch-size extraction from opts.
add-demand - saturating addition for demand accounting.
This namespace is an implementation detail; do not use from application code.
add-demand
(add-demand current delta)
Add delta to current demand with saturation at Long/MAX_VALUE.
Used to accumulate backpressure demand counts without overflow.
Args:
current - long, existing demand.
delta - long, demand increment.
Returns a long in [current, Long/MAX_VALUE].
await-future
(await-future fut)
Block the calling thread until fut completes and returns its result.
Used inside Reactive Streams callbacks where parking is not available.
Propagates CancellationException and ExecutionException unwrapped.
Args:
fut - CompletableFuture<T>.
Returns the future's result value.
await-void-pub!
(await-void-pub! pub)
Subscribe to a Publisher<Void> and block until it completes.
Used for fire-and-forget publishers such as Statement.execute() void results.
Args:
pub - Publisher<Void> to drain.
first-pub-blocking
(first-pub-blocking pub sub-ref)
Subscribe to pub, request one item, and return it synchronously.
Blocks the calling thread. Intended for driver-level one-shot publishers
(e.g., connection metadata) where Missionary scheduling is unavailable.
Args:
pub - Publisher<T> to subscribe to.
sub-ref - volatile that receives the Subscription for cancellation.
request-async!
(request-async! sub n)
Call Subscription.request(n) on sub asynchronously.
Schedules the request on a ForkJoinPool background thread to avoid
re-entrant driver callbacks.
Args:
sub - Subscription to request from.
n - number of items to request.
resolve-fetch-size
(resolve-fetch-size opts)
Resolve the effective fetch size from opts.
Returns the :fetch-size value when present and valid; otherwise returns
the default of 128. Applies the value as a primitive long.
Args:
opts - map, may contain :fetch-size.
Returns a positive long.
result-row-pub
(result-row-pub result row-xf)
Return a Publisher<T> for all rows in result, each transformed by row-xf.
row-xf is applied inside BiFunction.apply() - before Result.map's finally
block calls message.release() on the underlying ByteBuf. This guarantees the
Row's backing ByteBuf is live when row-xf (e.g., builder-fn) reads column
values via Row.get(), regardless of whether the downstream Flux buffers the
row internally (demand == 0) before delivering it.
Applying row-xf here rather than in the downstream subscriber's onNext is the
only safe point: once sink.next() queues a row into Reactor's internal buffer
(because demand is zero at that instant) and the enclosing handler returns,
message.release() fires and the ByteBuf is returned to the Netty pool. Any
subsequent Row.get() call on the buffered row then reads freed memory,
causing IllegalReferenceCountException that escapes the subscriber's onNext
without calling notifier - leaving the Missionary reduce waiting forever.
Args:
result - R2DBC Result to map over.
row-xf - 1-arity fn [Row] -> T applied to each row inside BiFunction.apply().
result-rows-pub
(result-rows-pub result-pub result-sub-ref row-xf)
Return a Publisher<T> that streams transformed rows from result-pub sequentially.
At most one Result is requested at a time. Downstream demand is forwarded to
the active Result.map publisher, so row demand remains bounded by the row-flow
fetch-size rather than collapsing to Long/MAX_VALUE at the Result boundary.
row-xf is forwarded to result-row-pub and applied inside BiFunction.apply()
while the Row's ByteBuf is guaranteed to be live. See result-row-pub for the
detailed safety argument.
Args:
result-pub - Publisher<Result> to consume.
result-sub-ref - volatile holding the outer Subscription for cancellation.
row-xf - 1-arity fn [Row] -> T applied to each row.