clj-r2dbc
Facilities for Reactive Relational Database Connectivity (R2DBC) communication.
A functional wrapper around R2DBC, designed for use with the Missionary reactive
programming library. It manages Resource acquisition is initialization (RAII)
for database connections and translates result sets into Clojure data structures.
The basic building blocks are:
connect - Returns a ConnectionFactory from an R2DBC URL.
with-options - Attaches default options to a connectable.
info - Returns database connection metadata.
execute - Executes a SQL statement and returns a labeled result map.
first-row - Executes a SQL statement and returns the first row or nil.
stream - Executes a SQL statement and returns a discrete flow of rows.
execute-each - Executes a prepared statement with multiple binding sets.
batch - Executes a sequence of SQL statements in a single round trip.
with-connection - Acquires a connection and executes a task.
with-transaction - Acquires a connection, begins a transaction, and executes a task.
with-conn - Acquires a connection and binds it for a task (macro).
with-tx - Acquires a transaction connection and binds it for a task (macro).
batch
added in 0.1
(batch db statements & {:as opts})
Execute a sequence of SQL statements via Connection.createBatch() and return a Missionary task.
Each string in statements is added to a single Batch object, which is
executed in one round trip. Row segments in batch results are silently
discarded - only update counts are returned. Suitable for DDL sequences
and unconditional DML scripts.
Doesn't support per-statement parameters. For parameterized bulk operations
use execute-each.
Args:
db - ConnectionFactory, Connection, or wrapped connectable.
statements - Sequential of non-blank SQL strings. Each string is added to
the batch in order.
opts - (optional) Reserved options map; defaults to {}. Accepts a map,
keyword arguments, or both.
Returns a task resolving to:
{:clj-r2dbc/update-counts [N1 N2 ...]} - one long per statement in order.
Use the update-counts accessor fn to extract with a safe default of [].
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when db is nil.
ex-info :clj-r2dbc/invalid-type when statements is not sequential.
ex-info :clj-r2dbc/unsupported when both :middleware and :interceptors are
present in opts.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category.
Example:
(m/? (batch db ["CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY)"
"INSERT INTO t (id) VALUES (1)"
"INSERT INTO t (id) VALUES (2)"]))
;=> {:clj-r2dbc/update-counts [0 1 1]}connect
added in 0.1
(connect url & {:as opts})
Return a ConnectionFactory from an R2DBC URL string - synchronous, not a task.
Args:
url - Non-blank R2DBC URL string. Forwarded verbatim to
io.r2dbc.spi.ConnectionFactories/get. All composite URL schemes
are accepted without modification:
"r2dbc:h2:mem:///name"
"r2dbc:postgresql://user:pass@host/db"
"r2dbc:pool:postgresql://user:pass@host/db"
"r2dbc:proxy:postgresql://user:pass@host/db".
opts - (optional) Reserved options map; defaults to {}. Present for
forward-compatibility; no keys are currently consumed.
Accepts a map, keyword arguments, or both.
Returns a ConnectionFactory instance synchronously.
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when url is nil.
ex-info :clj-r2dbc/invalid-type when url is not a string.
ex-info :clj-r2dbc/invalid-value when url is blank.
Example:
(def db (connect "r2dbc:h2:mem:///r2dbc-test;DB_CLOSE_DELAY=-1"))
(def pooled (connect "r2dbc:pool:postgresql://user:pass@localhost/mydb"))execute
added in 0.1
(execute db sql & {:as opts})
Execute one SQL statement and return a Missionary task resolving to a labeled result map.
Acquires a connection from db if needed (RAII), executes sql with the given
params, and releases the connection. When db is an existing Connection, the
connection is used directly and its lifecycle is owned by the caller.
Args:
db - ConnectionFactory, Connection, or wrapped connectable (from with-options).
sql - Non-blank SQL string.
opts - (optional) Execution options; defaults to {}. Accepts a map, keyword
arguments, or both.
Options:
:params - sequential, default []. Positional bind parameters. nil,
byte arrays, and io.r2dbc.spi.Parameter are dispatched via
clj-r2dbc.row/Parameter.
:fetch-size - integer in [1, 32768], default driver-determined. Rows per
demand batch.
:builder - (fn [Row RowMetadata] -> value), default
clj-r2dbc.row/kebab-maps. Called once per row. Use
row/raw-maps, row/vectors, or a custom 2-arity fn.
:returning - boolean or vector of column-name strings. Calls
Statement.returnGeneratedValues(). Use true to request all
generated values; use a vector to name specific columns.
:middleware - sequential of middleware fns (clj-r2dbc.middleware). Mutually
exclusive with :interceptors. Applied right-to-left.
:interceptors - sequential of interceptor maps (clj-r2dbc.interceptor).
Mutually exclusive with :middleware. Bounded to 64 stages;
override with :max-interceptor-depth (positive integer).
Returns a task resolving to a labeled result map. Keys present depend on SQL type:
SELECT, WITH, VALUES, SHOW, DESCRIBE, or EXPLAIN:
{:clj-r2dbc/rows [{:col val ...} ...]} - vector, may be empty.
DML (INSERT, UPDATE, or DELETE):
{:clj-r2dbc/update-count N} - N is a long.
RETURNING or mixed result:
{:clj-r2dbc/rows [...] :clj-r2dbc/update-count N}.
Use the rows and update-count accessor fns to extract values with safe defaults.
Throws (synchronously, before task is created):
ex-info :clj-r2dbc/missing-key when db or sql is nil.
ex-info :clj-r2dbc/invalid-type when sql is not a string, or :builder is not
a function.
ex-info :clj-r2dbc/invalid-value when sql is blank, or :fetch-size is out of
[1, 32768].
ex-info :clj-r2dbc/unsupported when both :middleware and :interceptors are
present, or when :builder-fn is passed directly.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category, :sql, :params.
Example:
(m/? (execute db "SELECT id, name FROM users WHERE id = $1" {:params [42]}))
;=> {:clj-r2dbc/rows [{:id 42 :name "Alice"}]}
;; Equivalent keyword-arg form:
(m/? (execute db "SELECT id, name FROM users WHERE id = $1" :params [42]))
(m/? (execute db "INSERT INTO users (name) VALUES ($1)" {:params ["Carol"]}))
;=> {:clj-r2dbc/update-count 1}execute-each
added in 0.1
(execute-each db sql params & {:as opts})
Execute one prepared statement with multiple binding sets and return a Missionary task.
Uses Statement.add() to bind all param sets to a single prepared statement,
then executes it once. Each inner sequential in params becomes one binding
set. Suitable for bulk inserts and parameterized batch operations where each
row uses the same SQL template.
Doesn't support :middleware or :interceptors. Use execute in a loop for
interceptor-wrapped per-row execution.
Args:
db - ConnectionFactory, Connection, or wrapped connectable.
sql - Non-blank SQL string.
params - Sequential of sequentials. Each inner sequential is one set of
positional bind parameters.
opts - (optional) Options; defaults to {}. Accepts a map, keyword arguments,
or both.
Options:
:fetch-size - integer in [1, 32768], default driver-determined.
:builder - (fn [Row RowMetadata] -> value), default
clj-r2dbc.row/kebab-maps.
:max-batch-size - positive integer, default 10000. Maximum number of
binding sets allowed. Throws :clj-r2dbc/limit-exceeded
when count(params) exceeds this limit. Override with
:max-batch-size (positive integer).
Returns a task resolving to:
{:clj-r2dbc/results [...]} - vector of per-binding-set result maps.
Each element has the same shape as an execute result map:
{:clj-r2dbc/rows [...]} for SELECT-like SQL.
{:clj-r2dbc/update-count N} for DML.
{:clj-r2dbc/rows [...] :clj-r2dbc/update-count N} for RETURNING.
Use the results accessor fn to extract with a safe default.
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when db or sql is nil.
ex-info :clj-r2dbc/invalid-type when sql is not a string, or params is not
sequential.
ex-info :clj-r2dbc/invalid-value when sql is blank, or :fetch-size is out of
[1, 32768].
ex-info :clj-r2dbc/limit-exceeded when count(params) exceeds :max-batch-size;
carries :key :param-sets, :limit N,
:value <count>.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category.
Example:
(m/? (execute-each db
"INSERT INTO t (id, name) VALUES ($1, $2)"
[[1 "Alice"] [2 "Bob"] [3 "Carol"]]))
;=> {:clj-r2dbc/results [{:clj-r2dbc/update-count 1}
;; {:clj-r2dbc/update-count 1}
;; {:clj-r2dbc/update-count 1}]}
;; Keyword-arg form:
(m/? (execute-each db "INSERT INTO t (id) VALUES ($1)" [[1] [2]] :max-batch-size 5000))first-row
added in 0.1
(first-row db sql & {:as opts})
Execute one SQL statement and return a Missionary task resolving to the first row or nil.
Executes sql exactly as execute does, then returns the first element of
:clj-r2dbc/rows, or nil when the result set is empty. Returns nil (not an
empty map, not an exception) when zero rows are returned. Suitable for
existence checks and single-row lookups.
Args:
db - ConnectionFactory, Connection, or wrapped connectable (from with-options).
sql - Non-blank SQL string.
opts - (optional) Execution options; defaults to {}. Accepts a map, keyword
arguments, or both. Same options as execute: :params,
:fetch-size, :builder, :returning, :middleware, :interceptors,
:max-interceptor-depth.
Returns a task resolving to the first row map, or nil when no rows were returned.
Throws (synchronously, before task is created):
ex-info :clj-r2dbc/missing-key when db or sql is nil.
ex-info :clj-r2dbc/invalid-type when sql is not a string, or :builder is not
a function.
ex-info :clj-r2dbc/invalid-value when sql is blank, or :fetch-size is out of
[1, 32768].
ex-info :clj-r2dbc/unsupported when both :middleware and :interceptors are
present.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category, :sql, :params.
Example:
(m/? (first-row db "SELECT id, name FROM users WHERE id = $1" {:params [42]}))
;=> {:id 42 :name "Alice"}
;; Equivalent keyword-arg form:
(m/? (first-row db "SELECT id, name FROM users WHERE id = $1" :params [42]))
(m/? (first-row db "SELECT 1 WHERE false"))
;=> nilinfo
added in 0.1
(info db)
Return database connection metadata as a Missionary task.
When db is a ConnectionFactory or pool: acquires a connection, reads
metadata, and releases the connection via RAII. When db is an existing Connection:
uses it directly; the caller owns the connection lifecycle.
Args:
db - ConnectionFactory, Connection, or wrapped connectable. Must not be nil.
Returns a task resolving to:
{:db/product-name String - from ConnectionMetadata.getDatabaseProductName().
:db/version String} - from ConnectionMetadata.getDatabaseVersion().
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when db is nil.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category.
Example:
(m/? (info db))
;=> {:db/product-name "H2" :db/version "2.4.240"}stream
added in 0.1
(stream db sql & {:as opts})
Execute one SQL statement and return a Missionary discrete flow emitting one value per row.
Unlike execute, which collects all rows into a vector, stream is
demand-driven: rows are pulled from the database in fetch-size batches and
emitted one at a time to the consumer. Connection lifecycle follows RAII:
acquired at flow invocation, released on completion, error, or cancellation.
When db is an existing Connection, the caller owns the lifecycle.
Flyweight warning: When :stream-mode is :flyweight, the flow emits a single
shared RowCursor instance that is mutated in-place for every row. Retaining a
reference to the cursor past the current m/?> boundary silently returns data
from a later row - no exception is thrown. Always materialize cursor data
within the same reduce step. The default :stream-mode :immutable avoids this
hazard entirely.
Args:
db - ConnectionFactory, Connection, or wrapped connectable (from with-options).
sql - Non-blank SQL string.
opts - (optional) Stream options; defaults to {}. Accepts a map, keyword
arguments, or both.
Options:
:params - sequential, default []. Positional bind parameters.
:fetch-size - integer in [1, 32768], default driver-determined. Controls
Subscription.request(N) batch size.
:builder - (fn [Row RowMetadata] -> value), default
clj-r2dbc.row/kebab-maps. Applied inside the fetch loop
while Row is valid. Values are safe to retain. Required
when using :chunk-size.
:stream-mode - :immutable (default) or :flyweight.
:immutable - applies :builder per row; emits immutable values.
:flyweight - emits a shared mutable RowCursor; see warning above.
:chunk-size - integer in [1, 32768]. Changes emission unit from individual
rows to java.util.ArrayList chunks of up to chunk-size
elements. Requires :builder. Reduces Missionary scheduler
overhead from O(rows) to O(batches).
:returning - boolean or vector of column names. Calls
Statement.returnGeneratedValues().
:middleware - sequential of middleware fns. Mutually exclusive with
:interceptors.
:interceptors - sequential of interceptor maps. Mutually exclusive with
:middleware. Bounded to 64 stages (override:
:max-interceptor-depth).
Returns a Missionary discrete flow. Consume with m/reduce or any flow
combinator. This is not a task - it can't be awaited with m/? directly.
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when :chunk-size is present but :builder is
absent.
ex-info :clj-r2dbc/invalid-type when sql is not a string, or :builder is not
a function.
ex-info :clj-r2dbc/invalid-value when sql is blank, :chunk-size or :fetch-size
is out of [1, 32768], or :stream-mode is not
:immutable or :flyweight.
ex-info :clj-r2dbc/unsupported when both :middleware and :interceptors are
present.
Example:
;; Collect all rows (default :immutable mode)
(m/? (m/reduce conj [] (stream db "SELECT id, name FROM users")))
;=> [{:id 1 :name "Alice"} {:id 2 :name "Bob"}]
;; Keyword-arg form:
(m/? (m/reduce conj [] (stream db "SELECT id FROM users" :fetch-size 128)))
;; Chunk mode - emit ArrayList chunks; :builder required
(require '[clj-r2dbc.row :as row])
(m/? (m/reduce #(+ %1 (.size ^java.util.ArrayList %2))
0
(stream db "SELECT 1"
{:builder row/kebab-maps :chunk-size 64})))with-conn
macro
added in 0.1
(with-conn [conn db & [opts]] & body)
Macro. Acquire a connection, bind it to conn, and run body as a Missionary task.
Expands to with-connection.
Internal symbols are generated via gensym (db__, opts__, ctx__) - no variable capture
regardless of the name chosen for conn or any surrounding locals.
Binding vector:
[conn db] - conn bound to Connection; default options
[conn db opts] - conn bound to Connection; opts merged via with-options
before connection acquisition
conn is bound to the raw Connection object, not the context map. All RAII
semantics from with-connection apply: the connection is closed on success,
error, or Missionary cancellation when db is a ConnectionFactory or pool.
When db is an existing Connection, the caller owns its lifecycle.
Args (binding vector):
conn - Symbol. Bound to the raw Connection object within body.
db - Expression evaluating to ConnectionFactory, Connection, or wrapped
connectable.
opts - (optional) Options map: any key accepted by execute, stream, etc.
Returns a Missionary task resolving to the value of the last body form.
Example:
(m/? (with-conn [conn db]
(m/sp
(m/? (execute conn "SELECT 1 AS n"))
(m/? (execute conn "SELECT 2 AS n")))))
;=> {:clj-r2dbc/rows [{:n 2}]}with-connection
added in 0.1
(with-connection db f & {:as opts})
Acquire a connection, call f with {:connection conn}, and return a Missionary task.
When db is a ConnectionFactory or pool: acquires a connection, passes it to
f inside {:connection conn}, and closes the connection on success, error, or
Missionary cancellation (RAII). When db is an existing Connection: passes it
directly; the library doesn't close the connection - the caller owns the
lifecycle.
f must accept one map argument and return a Missionary Task. This is the
function-based equivalent of the with-conn macro; prefer with-conn at the
call site and with-connection for higher-order usage.
Args:
db - ConnectionFactory, Connection, or wrapped connectable.
f - (fn [{:keys [connection]}] -> Task). Receives a map containing the
live Connection under :connection.
opts - (optional) Options merged via with-options before acquisition; defaults
to {}. Accepts a map, keyword arguments, or both.
Returns a task resolving to the return value of (f {:connection conn}).
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when db or f is nil.
ex-info :clj-r2dbc/invalid-type when f is not a function.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category.
Example:
(m/? (with-connection db
(fn [{:keys [connection]}]
(m/sp
(m/? (execute connection "SELECT 1 AS n"))))))
;=> {:clj-r2dbc/rows [{:n 1}]}with-options
added in 0.1
(with-options db & {:as opts})
Attach default options to a connectable and return a new wrapper - synchronous, not a task.
Accepts a ConnectionFactory, Connection, or an already-wrapped connectable.
Nesting is supported: wrapping a wrapped connectable merges options (outer
defaults first, inner defaults win on conflict).
Merge semantics at call time: per-call opts passed to execute, stream, etc.
win over defaults set here. A nil value in per-call opts doesn't clobber a
default - nil values are stripped before merging.
Doesn't modify the underlying object; always returns a new wrapper record.
Args:
db - ConnectionFactory, Connection, or wrapped connectable. Must not be nil.
opts - Default options to attach. Accepts a map, keyword arguments, or both
. Defaults to {} when omitted.
Returns a wrapped connectable that carries the merged default options.
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when db is nil.
Example:
(def db-256 (-> (connect "r2dbc:h2:mem:///db")
(with-options {:fetch-size 256})))
;; All calls through db-256 default to :fetch-size 256 unless overridden.
;; Equivalent keyword-arg form:
(def db-256 (with-options (connect "r2dbc:h2:mem:///db") :fetch-size 256))with-transaction
added in 0.1
(with-transaction db f & {:as opts})
Acquire a connection, begin a transaction, call f with {:connection conn}, and return a
Missionary task resolving to the result of f.
On success: commits the transaction, then closes the connection.
On any Throwable thrown from f: rolls back (suppressing secondary rollback
exceptions onto the original throwable via .addSuppressed), closes the
connection, and re-throws the original.
missionary.Cancelled: re-thrown immediately without suppression.
Connection ownership: when db is a ConnectionFactory or pool, the library
acquires and closes the connection (RAII). When db is an existing Connection,
it is used directly and its lifecycle is owned by the caller.
Join semantics: if the context already carries :clj-r2dbc/in-transaction? true
(e.g. a nested call via with-tx on the same Connection), the transaction
phase is skipped and f is called directly. No subtransactions are created.
Args:
db - ConnectionFactory, Connection, or wrapped connectable (from with-options).
f - (fn [{:keys [connection]}] -> Task). Receives the live Connection.
opts - (optional) Transaction options; defaults to {}. Accepts a map, keyword
arguments, or both.
Transaction Options:
:isolation - keyword, default driver-default.
:read-uncommitted | :read-committed
:repeatable-read | :serializable
:read-only? - boolean, default false.
:name - string, default nil. Driver-specific transaction or
savepoint name.
:lock-wait-timeout-ms - positive integer, default nil. Converted to
java.time.Duration.
When opts is empty, uses the no-arg Connection.beginTransaction() overload.
Returns a task resolving to the return value of (f {:connection conn}).
Throws (synchronously):
ex-info :clj-r2dbc/missing-key when db or f is nil.
ex-info :clj-r2dbc/invalid-type when f is not a function.
Throws (inside task, on driver error):
ex-info wrapping R2dbcException - carries :sql-state, :error-code,
:clj-r2dbc/error-category.
Example:
(m/? (with-transaction db
(fn [{:keys [connection]}]
(m/sp
(m/? (execute connection "INSERT INTO t (v) VALUES ($1)" {:params [1]}))
(m/? (execute connection "UPDATE s SET n = n + 1"))))
{:isolation :read-committed}))
;; Equivalent keyword-arg form:
(m/? (with-transaction db
(fn [{:keys [connection]}]
(execute connection "INSERT INTO t (v) VALUES ($1)" :params [1]))
:isolation :read-committed))with-tx
macro
added in 0.1
(with-tx [conn db & [opts]] & body)
Macro. Acquire a connection in a transaction, bind it to conn, and run body as a
Missionary task. Expands to with-transaction.
Internal symbols are generated via gensym (db__, opts__, ctx__) - no variable capture
regardless of the name chosen for conn or any surrounding locals.
Binding vector:
[conn db] - conn bound to Connection; default transaction options
[conn db opts] - conn bound to Connection; opts map passed to
with-transaction for transaction configuration
Transaction semantics, connection ownership, join semantics, commit/rollback
behavior, and error handling are identical to with-transaction. See
with-transaction.
Args (binding vector):
conn - Symbol. Bound to the raw Connection object within body.
db - Expression evaluating to ConnectionFactory, Connection, or wrapped
connectable.
opts - (optional) Transaction options map: :isolation, :read-only?, :name,
:lock-wait-timeout-ms.
Returns a Missionary task resolving to the value of the last body form.
Example:
(m/? (with-tx [conn db {:isolation :serializable}]
(m/sp
(m/? (execute conn "SELECT val FROM t WHERE id = $1 FOR UPDATE"
{:params [1]}))
(m/? (execute conn "UPDATE t SET val = val + 1 WHERE id = $1"
{:params [1]})))))
;; Nested with-tx joins the outer transaction (no subtransaction created)
(m/? (with-tx [outer db]
(with-tx [inner outer]
(execute inner "INSERT INTO audit (msg) VALUES ($1)"
{:params ["done"]}))))