Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Corrosion

corrosion is a distributed system for propagating SQLite state across a cluster of nodes.

Features:

  • Just SQLite
  • Multi-writer via CRDTs (uses the cr-sqlite extension)
  • Eventually consistent
  • RESTful HTTP API
  • Subscribe to SQL queries
  • QUIC peer-to-peer
  • SWIM cluster formation
  • Fast, gossip-based, updates dissimination

Quick start

This is a simple example of starting a 2-node Corrosion cluster running on the same host and replicating data.

Node A

1. Create a schema file

-- /etc/corrosion/schema/todo.sql

CREATE TABLE todos (
    id BLOB NOT NULL PRIMARY KEY,
    title TEXT NOT NULL DEFAULT '',
    completed_at INTEGER
);

2. Configure Corrosion

# /etc/corrosion/config.toml

[db]
path = "/var/lib/corrosion/state.db"
schema_paths = ["/etc/corrosion/schema"]

[gossip]
addr = "[::]:8787" # the address we bind to
external_addr = "[::1]:8787" # the address we advertise as
plaintext = true

[api]
addr = "127.0.0.1:8080"

[admin]
path = "/var/run/corrosion/admin.sock"

3. Start a Corrosion agent

$ corrosion agent
2023-09-18T13:13:26.526907Z  INFO corrosion::command::agent: Starting Corrosion Agent v0.0.1
2023-09-18T13:13:26.623782Z  INFO corro_agent::agent: Actor ID: a3f72d6d38a24d0daee8258e10071f13
2023-09-18T13:13:26.655779Z  INFO corro_admin: Starting Corrosion admin socket at /var/run/corrosion/admin.sock
2023-09-18T13:13:26.658476Z  INFO corro_agent::agent: Starting peer API on udp/[::]:8787 (QUIC)
2023-09-18T13:13:26.661713Z  INFO corro_agent::agent: Starting public API server on tcp/127.0.0.1:8080
2023-09-18T13:13:27.022947Z  INFO corro_types::schema: creating table 'todos'
2023-09-18T13:13:27.023884Z  INFO corro_agent::api::public: updated 1 rows in __corro_schema for table todos
2023-09-18T13:13:27.025223Z  INFO corrosion::command::agent: Applied schema in 0.35491575s

4. Insert some data

$ corrosion exec --param 'some-id' --param 'Write some Corrosion docs!' 'INSERT INTO todos (id, title) VALUES (?, ?)'
INFO corrosion: Rows affected: 1

5. Query some data

Either via SQLite directly if you have access to the database directly:

$ sqlite3 /var/lib/corrosion/state.db 'SELECT * FROM todos;'
some-id|Write some Corrosion docs!|

or via the API if you don't:

$ corrosion query 'SELECT * FROM todos;'
some-id|Write some Corrosion docs!|

Node B

1. Copy the schema file

$ mkdir -p /etc/corrosion-b/schema
$ cp /etc/corrosion/schema/todo.sql /etc/corrosion-b/schema/todo.sql

2. Configure Corrosion

# /etc/corrosion-b/config.toml

[db]
path = "/var/lib/corrosion-b/state.db"
schema_paths = ["/etc/corrosion-b/schema"]

[gossip]
addr = "[::]:8788" # the address we bind to
external_addr = "[::1]:8788" # the address we advertise as
bootstrap = ["[::1]:8787"] # bootstrap the node's cluster discovery w/ node A
plaintext = true

[api]
addr = "127.0.0.1:8081"

[admin]
path = "/var/run/corrosion-b/admin.sock"

3. Start a Corrosion agent

$ corrosion -c /etc/corrosion-b/config.toml agent
2023-09-18T13:37:00.696728Z  INFO corrosion::command::agent: Starting Corrosion Agent v0.0.1
2023-09-18T13:37:00.768080Z  INFO corro_agent::agent: Actor ID: 4e3e57d1faee47449c1f238559284bc2
2023-09-18T13:37:00.772773Z  INFO corro_admin: Starting Corrosion admin socket at /var/run/corrosion-b/admin.sock
2023-09-18T13:37:00.773162Z  INFO corro_agent::agent: Starting peer API on udp/[::]:8788 (QUIC)
2023-09-18T13:37:00.773559Z  INFO corro_agent::agent: Starting public API server on tcp/127.0.0.1:8081
2023-09-18T13:37:00.775504Z  INFO corro_types::schema: creating table 'todos'
2023-09-18T13:37:00.775964Z  INFO corro_agent::api::public: updated 1 rows in __corro_schema for table todos
2023-09-18T13:37:00.776398Z  INFO corro_agent::agent: Current node is considered ACTIVE
2023-09-18T13:37:00.776731Z  INFO corrosion::command::agent: Applied schema in 0.001515042s
2023-09-18T13:37:01.954585Z  INFO corro_agent::agent: synced 2 changes w/ b4fcbb65501f44f0802aba631508be9d in 0.012817167s @ 156.04072257153237 changes/s

This last log shows that node B synchronized changes w/ node A.

Why were 2 changes synchronized? There's only 1 row!

cr-sqlite creates 1 change per column "changed" in a row. It's possible to inspect this directly for troubleshooting:

$ sqlite3 data-b/corrosion.db
sqlite> .mode column
sqlite> select * from todos__crsql_clock;
key  col_name      col_version  db_version  site_id  seq
---  ------------  -----------  ----------  -------  ---
1    title         1            1           1        0
2    completed_at  1            1           1        1

4. Query for the just-synchronized data

Either via SQLite directly if you have access to the database directly:

$ sqlite3 /var/lib/corrosion-b/state.db 'SELECT * FROM todos;'
some-id|Write some Corrosion docs!|

5. Insert some data of our own

$ corrosion -c /etc/corrosion-b/config.toml exec --param 'some-id-2' --param 'Show how broadcasts work' 'INSERT INTO todos (id, title) VALUES (?, ?)'
INFO corrosion: Rows affected: 1

6. Query data from Node A

The second row has been propagated.

# here we're pointing at node A's config explicitly for clarity, the default is /etc/corrosion/config.toml
$ corrosion -c /etc/corrosion/config.toml query 'SELECT * FROM todos;'
some-id|Write some Corrosion docs!|
some-id-2|Show how broadcasts work|

Appendix: Templates

1. Create a Corrosion template

Corrosion templates are powered by Rhai w/ custom functions. On change, templates are re-rendered.

<% for todo in sql("SELECT title, completed_at FROM todos") { %>
[<% if todo.completed_at.is_null() { %> <% } else { %>X<% } %>] <%= todo.title %>
<% } %>

2. Process the template

$ corrosion template "./todos.rhai:todos.txt"
INFO corrosion::command::tpl: Watching and rendering ./todos.rhai to todos.txt

3. Watch the todos.txt file for change

Uses watch in a different terminal to see updates when your todos change:

$ watch -n 0.5 cat todos.txt
Every 0.5s: cat todos.txt

[ ] Write some Corrosion docs!
[ ] Show how broadcasts work

Add another todo:

$ corrosion -c /etc/corrosion-b/config.toml exec --param 'some-id-3' --param 'Hello from a template!' 'INSERT INTO todos (id, title) VALUES (?, ?)'
INFO corrosion: Rows affected: 1

Your watch should have been update and should display:

$ watch -n 0.5 cat todos.txt
Every 0.5s: cat todos.txt

[ ] Write some Corrosion docs!
[ ] Show how broadcasts work
[ ] Hello from a template!

We did all those, so let's tick them off:

$ corrosion -c /etc/corrosion-b/config.toml exec 'UPDATE todos SET completed_at = 1234567890'
INFO corrosion: Rows affected: 3
$ watch -n 0.5 cat todos.txt
Every 0.5s: cat todos.txt

[X] Write some Corrosion docs!
[X] Show how broadcasts work
[X] Hello from a template!

Setup development environment

This section contains some information that might be relevant for you when setting up a corrosion development environment. Depending on your computer setup your experience will vary.

We try to document any quirks in development here. If you find yourself having to adjust something in order to be able to work on corrosion, please let us know/ open a PR! Thank you!

Dependencies with Nix

The corrosion repository comes with a flake.nix file, which can build the main corrosion binary, load development dependencies, and serve a local mdbook server for the book (that you're reading right now!).

If you're not familiar with Nix you can learn more about it here: https://nixos.org.

You will have to enable the "experimental-features" flag for Nix. You can do so by creating a configuration at ~/.config/nix/nix.conf:

extra-experimental-features = nix-command flakes

Afterwards you can:

  • run nix build to build the corrosion command and all dependent crates. The final binary can then be found under ./result/
  • run nix develop to get a development shell for corrosion. Note: this will use a shell hook to update the limit of open file descriptors via ulimit. Make sure that your system is configured to allow this without requiring privileges!
  • run nix develop .#mdbook-shell to start the book development server

Conflict-free Replicated Data Types

What's a CRDT?

About CRDTs

Conflict-free Replicated Data Types (CRDTs) are used in systems with optimistic replication, where they take care of conflict resolution. CRDTs ensure that, no matter what data modifications are made on different replicas, the data can always be merged into a consistent state. This merge is performed automatically by the CRDT, without requiring any special conflict resolution code or user intervention.

cr-sqlite

Corrosion uses the cr-sqlite SQLite extension to accomplish its multi-writer and eventual consistency promise. Here's a short intro about how it generally works. The rest of this section assumes some knowledge of cr-sqlite.

cr-sqlite provides functions to mark tables, in a SQLite database, as backed by CRDTs. These include Causal-Length (pdf paper) and Last-Write-Wins (LWW).

As of cr-sqlite 0.15, the CRDT for an existing row being update is this:

  1. Biggest col_version wins
  2. In case of a tie, the "biggest" value is used.

Basics

With the extension loaded, writes to CRDT-backed tables will trigger insertions in internal tables for each column in a row.

An aggregate view table is available as crsql_changes. By SELECT-ing from this table and INSERT-ing into it, it's possible to distribute changes to a cluster. Each set of changes (a transaction produces a single set of changes) gets a new db_version, unique to the database.

Corrosion and cr-sqlite

Corrosion executes transactions by processing requests made to its client HTTP API. Each transaction triggers 1+ broadcast (big changesets are chunked). Each change is serialized in an efficient format and sent to ~random members of the cluster.

The main caveat of this approach is: writes to the database all have to go through Corrosion. If a sqlite client were to issue writes w/ or w/o the proper extension loaded, then data would become inconsistent for CRDT-backed tables.

CRsqlite tables

Crsqlite adds several virtual tables, but the main one I wanna look at is crsql_changes, since this is what we mainly interact with from corrosion. Each "real" data table also gets its own <table name>__crsql_clock table, which largely keeps track of the same information, but specific to that one table. These refer to (and keep track of) the "logical clock" of certain changes. A logical clock is a mechanism to establish causality of changes, without needing an actual, synchronous global clock between different participants in a system. Crsqlite specifically uses a "lamport timestamp" which, if you squint at from a distance, could be most concisely boiled down to a monotonically increasing counter.

Fun fact, crsqlite has another special table, the crsql_site_id, which we use to get a unique actor ID in our corrosion cluster (and which keeps track of other known actors IDs from across the network).

sqlite> select hex(site_id) from crsql_site_id;
D5F143E7BA65421C938C850CE78FC9F2

(This also means that by deleting the corrosion.db when re-instantiating a node, that node's actor ID will change automatically).

Let's start by setting up a new database with crsqlite enabled, and create a new table my_machines which simply keeps track of a machine ID, its name, and its status (if you wanna follow along, you'll have to download the library from the github releases). Afterwards we call the special crsql_as_crr ("as conflict-free replicated relation") function to instantiate crsql for this particular table. This means that we can opt-into crsql on a table-by-table basis!

 ❤ (tempest) ~/P/_/flyio> sqlite3 test1.db
SQLite version 3.45.1 2024-01-30 16:01:20
Enter ".help" for usage hints.
sqlite> .load extra-lib/crsqlite.so
sqlite> create table my_machines (id primary key not null, name text not null default '', status text not null default 'broken');
sqlite> select crsql_as_crr('my_machines');
OK

Importantly: to apply crsql_as_crr any fields that are not null must have a default value to allow for forwards (and backwards) compatible schema changes. In a distributed system this is much more important than for a traditional database server.

A simple example: generating some changes

Ok let's actually insert some data. Let's say we create two new machines meow and woof:

sqlite> .mode qbox
sqlite> insert into my_machines (id, name, status) values (1, 'meow', 'created');
sqlite> insert into my_machines (id, name, status) values (2, 'woof', 'created');
qlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010901' │ 'status' │ 'created' │ 1           │ 1          │ 1  │ 1   │
│ 'my_machines' │ x'010902' │ 'name'   │ 'woof'    │ 1           │ 2          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'status' │ 'created' │ 1           │ 2          │ 1  │ 1   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

So what happened here? Well, let's see what these values mean:

  • table :: which table was the change made on.
  • pk :: the respective primary key for each of the changes, encoded in a funky way to send and validate between different writers/ nodes.
  • cid and val :: the column id of the change, along with its new value
  • col_version :: an incremented counter for each column. Whenever a change is applied to a column, this counter is incremented.
  • db_version :: an overall database version. With each transaction this counter is incremented. So you can see that the two column changes for meow are both on db_version = 1, and it gets incremented to db_version = 2 when we insert woof.
  • cl :: this is the "causal length", which indicates whether the row is still present or was deleted. I had to do some digging to learn about this, but essentially it indicates the number of operations that causally (i.e. in relationship to) preceeded a given change. De-facto I don't think this value is currently used (at least not in our use-cases, but please correct me on that).
  • seq :: the order of changes in a larger transaction, making sure to apply a big insertion in the same order on each node. When merging multiple changes we make sure to remove redundant sequences.

Ok so far so good. But let's setup a second node/ database and insert the crsql changes there:

 ❤ (tempest) ~/P/_/flyio> sqlite3 test1.db
SQLite version 3.45.1 2024-01-30 16:01:20
Enter ".help" for usage hints.
sqlite> .mode qbox
sqlite> .load extra-lib/crsqlite.so
sqlite> create table my_machines (id primary key not null default '', name text not null default '', status text not null default 'broken');
sqlite> select crsql_as_crr('my_machines');
┌─────────────────────────────┐
│ crsql_as_crr('my_machines') │
├─────────────────────────────┤
│ 'OK'                        │
└─────────────────────────────┘

Now, something I ignored in the previous insert is the site_id (in corrosion parlour the "actor ID"). Each change has a source after all, which is represented by the actor ID. In our case it is D5F143E7BA65421C938C850CE78FC9F2 for node 1 and 75D983BA38A644E987735592FB89CA70 for node 2. And so, when inserting into test2.db's crsql_changes we need to consider it.

sqlite> insert into crsql_changes values ('my_machines', X'010901', 'name', 'meow', 1, 1, X'D5F143E7BA65421C938C850CE78FC9F2', 1, 0);
sqlite> insert into crsql_changes values ('my_machines', X'010901', 'status', 'created', 1, 1, X'D5F143E7BA65421C938C850CE78FC9F2', 1, 1);
sqlite> insert into crsql_changes values ('my_machines', X'010902', 'name', 'woof', 1, 2, X'D5F143E7BA65421C938C850CE78FC9F2', 1, 0);
sqlite> insert into crsql_changes values ('my_machines', X'010902', 'status', 'created', 1, 2, X'D5F143E7BA65421C938C850CE78FC9F2', 1, 1);

So what does test2.db look like now?

sqlite> select "name", "status" from my_machines;
┌────────┬───────────┐
│  name  │  status   │
├────────┼───────────┤
│ 'meow' │ 'created' │
│ 'woof' │ 'created' │
└────────┴───────────┘
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010901' │ 'status' │ 'created' │ 1           │ 2          │ 1  │ 1   │
│ 'my_machines' │ x'010902' │ 'name'   │ 'woof'    │ 1           │ 3          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'status' │ 'created' │ 1           │ 4          │ 1  │ 1   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

Neat! So you see, by sending each of the changes from crsql_changes to other nodes we can apply things as they were applied in the original database. Note how the db_version is incremented with every individual change, while on the original node each impacted row shares a version. This is not what happens in corrosion, because we batch multiple changes into the same transaction, which isn't the case for me in the sqlite3 repl:

Corrosion node A

 ❤  (tempest) ~/P/_/f/corrosion> sqlite3 devel-state/A/corrosion.db
SQLite version 3.45.1 2024-01-30 16:01:20
Enter ".help" for usage hints.
sqlite> .mode qbox
sqlite> .load ../extra-lib/crsqlite.so
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010901' │ 'status' │ 'created' │ 1           │ 1          │ 1  │ 1   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

Corrosion node B

 ❤  (tempest) ~/P/_/f/corrosion> sqlite3 devel-state/B/corrosion.db
SQLite version 3.45.1 2024-01-30 16:01:20
Enter ".help" for usage hints.
sqlite> .mode qbox
sqlite> .load ../extra-lib/crsqlite.so
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010901' │ 'status' │ 'created' │ 1           │ 1          │ 1  │ 1   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

Handling conflicting changes

Now, what if, god forbid, we create some kind of conflict. Let's say we set the machine meow to started in test1.db and to destroyed in test2.db.

On node 1:

sqlite> update my_machines set status = 'started' where name = 'meow';
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'name'   │ 'woof'    │ 1           │ 2          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'status' │ 'created' │ 1           │ 2          │ 1  │ 1   │
│ 'my_machines' │ x'010901' │ 'status' │ 'started' │ 2           │ 3          │ 1  │ 0   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

On node 2:

sqlite> update my_machines set status = 'destroyed' where name = 'meow';
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬─────────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │     val     │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼─────────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'      │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'name'   │ 'woof'      │ 1           │ 3          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'status' │ 'created'   │ 1           │ 4          │ 1  │ 1   │
│ 'my_machines' │ x'010901' │ 'status' │ 'destroyed' │ 2           │ 5          │ 1  │ 0   │
└───────────────┴───────────┴──────────┴─────────────┴─────────────┴────────────┴────┴─────┘

Let's apply the change from 1 on 2:

sqlite> insert into crsql_changes values ('my_machines', X'010901', 'status', 'started', 2, 3, X'D5F143E7BA65421C938C850CE78FC9F2', 1, 0);
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'name'   │ 'woof'    │ 1           │ 3          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'status' │ 'created' │ 1           │ 4          │ 1  │ 1   │
│ 'my_machines' │ x'010901' │ 'status' │ 'started' │ 2           │ 6          │ 1  │ 0   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

Huh, so our status = destroyed was overwritten by the status = started change. Let's apply the change in the reverse direction (i.e. let test1.db know that we destroyed the machine).

sqlite> insert into crsql_changes values ('my_machines', X'010901', 'status', 'destroyed', 2, 5, X'75D983BA38A644E987735592FB89CA70', 1, 0);
sqlite> select "table", "pk", "cid", "val", "col_version", "db_version", "cl", "seq" from crsql_changes;
┌───────────────┬───────────┬──────────┬───────────┬─────────────┬────────────┬────┬─────┐
│     table     │    pk     │   cid    │    val    │ col_version │ db_version │ cl │ seq │
├───────────────┼───────────┼──────────┼───────────┼─────────────┼────────────┼────┼─────┤
│ 'my_machines' │ x'010901' │ 'name'   │ 'meow'    │ 1           │ 1          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'name'   │ 'woof'    │ 1           │ 2          │ 1  │ 0   │
│ 'my_machines' │ x'010902' │ 'status' │ 'created' │ 1           │ 2          │ 1  │ 1   │
│ 'my_machines' │ x'010901' │ 'status' │ 'started' │ 2           │ 3          │ 1  │ 0   │
└───────────────┴───────────┴──────────┴───────────┴─────────────┴────────────┴────┴─────┘

The machine remains started! Which is good. We don't want nodes disagreeing with each other about what the state of a machine (or any data, really) is. This is what "eventually consistent" means: eventually all the nodes are going to agree on what the state should be, even if there's some other funky writes in the system that may temporarily disagree. Why did it pick started? Well, because crsql uses a "largest write wins" strategy.

The order in which crsql checks for which value is "larger" is: col_version, followed by the value of the change, and finally the site_id (so essentially randomly picking one because the site_id is a random data).

In our example you can see that both the status = started and status = destroyed changes had a col_version = 2, so that comparison is out. What about the value? Sqlite provides a max function which uses lexographic ordering to determine which string is "bigger". destroyed comes before started and so started is "bigger":

sqlite> select max('started', 'destroyed');
┌─────────────────────────────┐
│ max('started', 'destroyed') │
├─────────────────────────────┤
│ 'started'                   │
└─────────────────────────────┘

Schema

Corrosion's schema definition happens via files each representing one or more tables, written in SQL (SQLite-flavored). This is done through CREATE TABLE and CREATE INDEX exclusively!

Manual migrations are not supported (yet). When schema files change, Corrosion can be reloaded (or restarted) and it will compute a diff between the old and new schema and make the changes.

Any destructive actions on the table schemas are ignored / prohibited. This includes removing a table definition entirely or removing a column from a table. Indexes can be removed or added.

Constraints

  • Only CREATE TABLE and CREATE INDEX are allowed
  • No unique indexes allowed (except for the default primary key unique index that does not need to be created)
  • The primary key must be non nullable
  • Non-nullable columns require a default value
    • This is a cr-sqlite constraint, but in practice w/ Corrosion: it does not matter. Entire changes will be applied all at once and no fields will be missing.
    • If table schemas are modified, then a default value is definitely required.

Example

-- /etc/corrosion/schema/apps.sql

CREATE TABLE apps (
    id INT NOT NULL PRIMARY KEY,
    name TEXT NOT NULL DEFAULT "",
    user_id INT NOT NULL DEFAULT 0
);

CREATE INDEX apps_user_id ON apps (user_id);

Reseeding Corrosion

You may need to restart every Corrosion node from an earlier snapshot. Common reasons include:

  • Major corrosion updates — a version upgrade might have changed the database structure of internal tables in an incompatible way.
  • Breaking schema change — you need to make a breaking change to your database schema that cr-sqlite forbids like for ex. removing a column.
  • Bad data spreading — unwanted data is propagating through the cluster, and restoring a known-good snapshot is faster than deleting or repairing it. We call this restore-from-snapshot workflow a reseed. The rest of this document explains how to do it safely.

Warning — data loss is expected. Reseeding from a snapshot means any writes accepted after the snapshot was taken will be lost. Either stop all writes before taking the snapshot, or be prepared to replay the missing writes afterward (see Re-insert any missing data).

Overview

The migration is performed in three phases:

  1. On a single node, create a clean snapshot suitable (skip if you already have one).
  2. Distribute the snapshot, stop corrosion and restore it on every node.
  3. Start corrosion and re-apply any writes that happened after the snapshot was taken.

1. Prepare a clean snapshot

Pick one node in the old cluster and use it as the source of truth for the snapshot. All other nodes will be restored from the backup created from this node.

1.1 Create a backup

While the corrosion agent is still running on the source node:

corrosion backup /path/to/snapshot.db

corrosion backup runs VACUUM INTO and strips per-node state (crsql_site_id, __corro_members, __corro_subs, consul hash tables), so the backup is often smaller than the running database.

1.2 Bump the cluster id

A new cluster_id ensures nodes that come up with the new snapshot don't accidentally talk to any nodes still on the old snapshot. Run this against the backup you just created:

sqlite3 /path/to/snapshot.db <<'SQL'
INSERT INTO __corro_state(key, value)
VALUES ('cluster_id', 1)
ON CONFLICT(key)
DO UPDATE SET value = value + 1;
SQL

If no cluster_id is set (it defaults to 0 when unset) this inserts 1; otherwise it increments the existing value by one.

1.3 Drop cr-sqlite internal tables (optional)

Do this if either of the following applies:

  • You're upgrading versions and the cr-sqlite internal table schema has changed.
  • Your database has grown large and you want a clean reset.

Dropping these tables lets them be recreated under the new schema (if schema changed) on startup. Run everything below against the snapshot file, not the live database.

Drop the clock and primary-key tables:

# Drop every internal cr-sqlite table.
sqlite3 /path/to/snapshot.db ".mode list" "SELECT 'DROP TABLE ' || name || ';' \
FROM sqlite_schema \
WHERE type = 'table' \
  AND (name LIKE '%__crsql_clock' \
    OR name LIKE '%__crsql_pks');" | sqlite3 /path/to/snapshot.db

This generates DROP TABLE statements from the schema, then pipes them back into sqlite3 to execute.

Drop bookkeeping and reclaim space:

sqlite3 /path/to/snapshot.db <<'SQL'
DROP TABLE IF EXISTS __corro_bookkeeping;

VACUUM;
SQL

The VACUUM reclaims space freed by the dropped tables and keeps the snapshot small for transfer.

1.4 Publish the snapshot

Compress the snapshot and upload it to a location every node can reach (S3, an internal artifact store, etc.):

pigz /path/to/snapshot.db
# upload /path/to/snapshot.db.gz somewhere

2. Restore every node from the snapshot

Repeat these steps on every node in the cluster:

  1. (Version upgrade only) Install the new corrosion binary .

  2. Download and decompress the snapshot to a local path.

  3. Stop the Corrosion agent. corrosion restore refuses to run while the agent is up.

  4. Restore:

    corrosion restore /path/to/snapshot.db
    
  5. Start the Corrosion agent.

Once a node starts, it will recreate the <table>__crsql_clock and <table>__crsql_pks tables with the new schema if needed and start gossiping changes.

3. Re-insert any missing data

Any rows written between the snapshot being taken (step 1.1) and the cluster coming back up on v1 are not present in the restored database. Re-apply them normally (via POST /v1/transactions or the PostgreSQL endpoint) so they get re-inserted and gossiped to the rest of the cluster.

If you took the snapshot during a write freeze there is nothing to do; the cluster is fully migrated.

Telemetry

Prometheus metrics

TYPE corro_broadcast_buffer_capacity gauge

TYPE corro_broadcast_pending_count gauge

TYPE corro_broadcast_recv_count counter

TYPE corro_broadcast_serialization_buffer_capacity gauge

TYPE corro_build_info gauge

TYPE corro_changes_committed counter

TYPE corro_db_buffered_changes_rows_total gauge

TYPE corro_db_table_checksum gauge

TYPE corro_db_table_rows_total gauge

TYPE corro_db_wal_truncate_seconds histogram

TYPE corro_gossip_broadcast_channel_capacity gauge

TYPE corro_gossip_cluster_size gauge

TYPE corro_gossip_config_max_transmissions gauge

TYPE corro_gossip_config_num_indirect_probes gauge

TYPE corro_gossip_member_added counter

TYPE corro_gossip_member_removed counter

TYPE corro_gossip_members gauge

TYPE corro_gossip_updates_backlog gauge

TYPE corro_peer_connection_accept_total counter

TYPE corro_peer_datagram_bytes_recv_total counter

TYPE corro_peer_datagram_bytes_sent_total counter

TYPE corro_peer_datagram_recv_total counter

TYPE corro_peer_datagram_sent_total counter

TYPE corro_peer_stream_accept_total counter

TYPE corro_peer_stream_bytes_recv_total counter

TYPE corro_peer_stream_bytes_sent_total counter

TYPE corro_peer_streams_accept_total counter

TYPE corro_sqlite_pool_execution_seconds histogram

TYPE corro_sqlite_pool_queue_seconds histogram

TYPE corro_sqlite_pool_read_connections gauge

TYPE corro_sqlite_pool_read_connections_idle gauge

TYPE corro_sqlite_pool_write_connections gauge

TYPE corro_sqlite_pool_write_connections_idle gauge

TYPE corro_sync_attempts_count counter

TYPE corro_sync_changes_recv counter

TYPE corro_sync_changes_sent counter

TYPE corro_sync_chunk_sent_bytes counter

TYPE corro_sync_client_head gauge

TYPE corro_sync_client_member counter

TYPE corro_sync_client_needed gauge

TYPE corro_sync_client_request_operations_need_count histogram

Deploy on Fly.io

The Corrosion repository on GitHub includes example files to deploy Fly Machines running Corrosion in a cluster, communicating via Fly private networking.

Corrosion is designed to run on the same node as any program that uses it. On Fly.io, that means deploying from a Docker image that runs both your code and Corrosion.

It's also possible for your other Machines on the same Fly private network to read from and write to their nearest Corrosion node via API. This can be handy for occasional or development use.

Launch a Corrosion cluster

This example deploys a 2-node Corrosion cluster on Fly Machines VMs, using the example files in corrosion/examples/fly/ within the Corrosion git repository.

Each node is a separate Fly Machine, and nodes communicate with each other over Fly.io private networking. The cluster is initialized with an empty database.

You'll be provisioning two shared-cpu-1x Machines and two 1GB Fly volumes for persistent storage. See the Fly.io resource pricing page for cost information.

Speedrun

In a nutshell, deploying Corrosion from source looks like this:

$ cp examples/fly/fly.toml .                        # copy the Fly Launch config file
$ fly launch --dockerfile examples/fly/Dockerfile   # launch a new app on Fly.io using the example files
$ fly scale count 1 --region <second-fly-region>    # add a second node to the cluster

Here it is again, in slightly more detail:

Preparation

Clone the Corrosion repository, and enter its root directory.

$ git clone https://github.com/superfly/corrosion.git && cd corrosion

Fly Launch uses a TOML file for app configuration. Copy the example fly.toml to the working directory.

$ cp examples/fly/fly.toml .

Edit fly.toml changing the app value from corrosion2 to a unique app name.

Launch a new app

Launch a new app on Fly.io, using the example Dockerfile.

Follow the prompts. Say Yes to copying the configuration from fly.toml, and No to adding any other databases. You can say Yes to deploy now, as well.

$ fly launch --dockerfile examples/fly/Dockerfile
Creating app in /Users/chris/Corrosion/corrosion
An existing fly.toml file was found for app corrosion
? Would you like to copy its configuration to the new app? Yes
Using dockerfile examples/fly/Dockerfile
? Choose an app name (leaving blank will default to 'corrosion') zaphod-test-app
? Select Organization: Zaphod Beeblebrox (personal)
Some regions require a paid plan (bom, fra, maa).
See https://fly.io/plans to set up a plan.

? Choose a region for deployment: Toronto, Canada (yyz)
App will use 'yyz' region as primary

Created app 'zaphod-test-app' in organization 'personal'
Admin URL: https://fly.io/apps/zaphod-test-app
Hostname: zaphod-test-app.fly.dev
? Would you like to set up a Postgresql database now? No
? Would you like to set up an Upstash Redis database now? No
Wrote config file fly.toml
? Would you like to deploy now? Yes

If you happen to have responded No to the Would you like to deploy now? line, you can execute the deployment step separately with the fly deploy command.

Fly Launch will build the Docker image, create a storage volume, and deploy your new Corrosion app on a single Fly Machine.

When deployment is complete, check that a Machine has been created and is in the started state with the fly status command:

$ fly status
App
  Name     = zaphod-test-app                                        
  Owner    = personal                                   
  Hostname = zaphod-test-app.fly.dev                                
  Image    = zaphod-test-app:deployment-01HD1QXKKJZX9RD1WP52YCRD9Q  
  Platform = machines                                   

Machines
PROCESS ID              VERSION REGION  STATE   ROLE    CHECKS  LAST UPDATED         
app     9185741db15398  1       yyz     started                 2023-10-18T16:04:03Z

You can also see the latest internal activity with the fly logs command.

Check on the database

To get a shell session on a Fly Machine use fly ssh console:

$ fly ssh console --pty --select

A Corrosion node's local database is located by default at /var/lib/corrosion/state.db. At this point it contains no data, but the todos table has been created according to the schema file /etc/corrosion/schemas/todo.sql.

You can read from this database using sqlite3 from the command line on the Corrosion node.

# sqlite3 /var/lib/corrosion/state.db '.mode column' 'PRAGMA table_info(todos);'
cid  name          type     notnull  dflt_value  pk
---  ------------  -------  -------  ----------  --
0    id            BLOB     1                    1 
1    title         TEXT     1        ''          0 
2    completed_at  INTEGER  0                    0 

Add a second node

Scale up to two Machines. Put the second one in another part of the world if you like:

$ fly scale count 1 --region <second-fly-region>

The fly scale count command provisions a new Machine with an empty volume attached, because the original Machine has a volume. Once the new node joins the cluster, Corrosion populates its local database on this volume with the latest data from the cluster.

Once the second Machine is running, you should be able to see log messages from Corrosion on both instances.

You can use the example database to test out your Corrosion cluster: Work with cluster data on Fly.io.

Appendix: Example files for deployment on Fly.io

Fly Launch configuration

The Fly platform uses a TOML file to configure an app for deployment.

$ cp examples/fly/fly.toml .
# Example fly.toml
app = "corrosion"

[env]
RUST_BACKTRACE="1"
# RUST_LOG="info,foca=debug"

[mounts]
source = "corro_data"
destination = "/var/lib/corrosion"

[metrics]
port = 9090
path = "/"

The app entry is updated with your chosen app name on launch.

The mounts section tells Fly Launch that this app needs a storage volume named "corro_data" and that it should be mounted at /var/lib/corrosion in the Machine's file system. A Fly Volume of this name will be created for the first Machine on the first deployment.

Corrosion exports Prometheus metrics; the metrics section tells the Fly Platform where to look for them. This port setting corresponds to the setting for prometheus.addr under telemetry in the Corrosion configuration.

No public services are configured for the Corrosion cluster, because nodes communicate over private networking.

Dockerfile

The example Dockerfile corrosion/examples/fly/Dockerfile builds Corrosion from a local copy of the source repository in a separate stage and creates a final Debian-based Docker image with the built Corrosion binary included. It copies the example files from the local corrosion/examples/fly/corrosion-files directory and uses them to configure and run Corrosion with an empty example database.

SQLite3 and not-perf are installed for convenience.

# build image
FROM rust:bookworm as builder

RUN apt update && apt install -y build-essential gcc-x86-64-linux-gnu clang llvm

# Install mold
ENV MOLD_VERSION=1.11.0
RUN set -eux; \
    curl --fail --location "https://github.com/rui314/mold/releases/download/v${MOLD_VERSION}/mold-${MOLD_VERSION}-x86_64-linux.tar.gz" --output /tmp/mold.tar.gz; \
    tar --directory "/usr/local" -xzvf "/tmp/mold.tar.gz" --strip-components 1; \
    rm /tmp/mold.tar.gz; \
    mold --version;

RUN set -eux; \
    curl --fail --location "https://github.com/koute/not-perf/releases/download/0.1.1/not-perf-x86_64-unknown-linux-gnu.tgz" --output /tmp/nperf.tar.gz; \
    tar --directory "/usr/local/bin" -xzvf "/tmp/nperf.tar.gz"; \
    rm /tmp/nperf.tar.gz; \
    nperf --version;

WORKDIR /usr/src/app
COPY . .
# Will build and cache the binary and dependent crates in release mode
RUN --mount=type=cache,target=/usr/local/cargo,from=rust:bookworm,source=/usr/local/cargo \
    --mount=type=cache,target=target \
    cargo build --release && mv target/release/corrosion ./

# Runtime image
FROM debian:bookworm-slim

RUN apt update && apt install -y sqlite3 watch && rm -rf /var/lib/apt/lists/*
COPY --from=builder /usr/local/bin/nperf /usr/src/app/corrosion /usr/local/bin/

# Create "corrosion" user
RUN useradd -ms /bin/bash corrosion

COPY examples/fly/entrypoint.sh /entrypoint.sh
COPY examples/fly/corrosion-files/ /etc/corrosion/

ENTRYPOINT ["/entrypoint.sh"]
# Run the app
CMD ["corrosion", "agent"]

Corrosion configuration

The supplied example Corrosion config file, config.toml, omits the gossip.addr and gossip.bootstrap entries. On startup. the entrypoint.sh script fills these in using FLY_PRIVATE_IP and FLY_APP_NAME environment variables that exist within the runtime environment.

The complete configuration file looks something like this on the running Machine:

# /etc/corrosion/config.toml
[db]
path = "/var/lib/corrosion/state.db"
schema_paths = ["/etc/corrosion/schemas"]
    
[gossip]
addr = "[fdaa:0:and:so:on:and:so:forth]:8787"
bootstrap = ["<your-app-name>.internal:8787"]
# addr and bootstrap for Fly.io deployment example are written 
# on startup by entrypoint script
plaintext = true   # Cryptography and authz are handled by Fly.io private networking
max_mtu = 1372     # For Fly.io private network
disable_gso = true # For Fly.io private network

[api]
addr = "[::]:8080" # Must be available on IPv6 for Fly.io private network

[admin]
path = "/app/admin.sock"

[telemetry]
prometheus.addr = "0.0.0.0:9090"

[log]
colors = false

The network settings in this example config are tailored for communication over your Fly private IPv6 WireGuard network. Cluster members communicate over port 8787, and the Corrosion API is reachable on port 8080. Corrosion exports Prometheus metrics at port 9090, which is hooked up to the Prometheus service on Fly.io via the metrics section in fly.toml.

Work with cluster data on Fly.io

With your Corrosion cluster deployed, you can work with the example database.

To get started, shell into each Corrosion node, in separate terminals.

fly ssh console --pty --app <your-app-name> --select

If you run the command from the directory containing your Corrosion app's fly.toml configuration file, you can omit the --app flag.

We'll call one Machine "Node A" and the other "Node B". Every node is read-write, so it doesn't matter which is which.

The example schema, todo.sql, specifies a single table called todos, with id, title, and completed_at columns.

-- /etc/corrosion/schemas/todo.sql

CREATE TABLE todos (
    id BLOB NOT NULL PRIMARY KEY,
    title TEXT NOT NULL DEFAULT '',
    completed_at INTEGER
);

Inserting and querying data

Insert some data on Node A

From Node A's terminal session:

# corrosion exec --param 'some-id' --param 'Write some Corrosion docs!' 'INSERT INTO todos (id, title) VALUES (?, ?)'
INFO corrosion: Rows affected: 1

Query data on Node A

Via SQLite directly:

# sqlite3 /var/lib/corrosion/state.db 'SELECT * FROM todos;'
some-id|Write some Corrosion docs!|

Using the API, via the CLI:

# corrosion query 'SELECT * FROM todos;' --columns
id|title|completed_at
some-id|Write some Corrosion docs!|

Query data on Node B

From Node B's terminal:

# corrosion query 'SELECT * FROM todos;' --columns
id|title|completed_at
some-id|Write some Corrosion docs!|

Node A's contribution is already present in Node B's database.

Insert data on Node B

# corrosion exec --param 'some-id-2' --param 'Show how broadcasts work' 'INSERT INTO todos (id, title) VALUES (?, ?)'
INFO corrosion: Rows affected: 1

Check the data in Node A's database

# corrosion query 'SELECT * FROM todos;' --columns
id|title|completed_at
some-id|Write some Corrosion docs!|
some-id-2|Show how broadcasts work|

The second update has propagated back to Node A.

Updating a file using a Corrosion template

The example template todos.rhai makes a checklist out of the rows in our todos table.

/* /etc/corrosion/templates/todos.rhai */

<% for todo in sql("SELECT title, completed_at FROM todos") { %>
[<% if todo.completed_at.is_null() { %> <% } else { %>X<% } %>] <%= todo.title %>
<% } %>

Start corrosion template and watch the output file

On Node A, start processing the template.

# corrosion template "/etc/corrosion/templates/todos.rhai:todos.txt" &
[1] 354
root@4d8964eb9d9487:/#  INFO corrosion::command::tpl: Watching and rendering /etc/corrosion/templates/todos.rhai to todos.txt

Whenever there's an update to the results of the template's query (or queries), corrosion template re-renders the output file.

Start watching the output file.

# watch -n 0.5 cat todos.txt
Every 0.5s: cat todos.txt

[ ] Write some Corrosion docs!
[ ] Show how broadcasts work

Add a todo item

On the other Machine (Node B), insert some data.

# corrosion exec --param 'some-id-3' --param 'Hello from a template!' 'INSERT INTO todos (id, title) VALUES (?, ?)'
INFO corrosion: Rows affected: 1

The new todo item gets propagated back to Node A, and your watch should look like this:

Every 0.5s: cat todos.txt

[ ] Write some Corrosion docs!
[ ] Show how broadcasts work
[ ] Hello from a template!

Mark all items as done

Mark all tasks as completed, on either node:

# corrosion exec 'UPDATE todos SET completed_at = 1234567890'
INFO corrosion: Rows affected: 3
$ watch -n 0.5 cat todos.txt
Every 0.5s: cat todos.txt

[X] Write some Corrosion docs!
[X] Show how broadcasts work
[X] Hello from a template!

Now you have a distributed to-do list app! A front end is left as an exercise for the reader ;)

Run Corrosion commands on a remote node

Corrosion's CLI works through the API. You can install Corrosion locally and run Corrosion commands on a remote node.

Connecting

A convenient way to access a remote node's API port on Fly.io is to open a user-mode WireGuard tunnel using the fly proxy command.

In the example configuration, Corrosion's API is configured on port 8080, so with Corrosion installed on the local machine, in a separate terminal, run:

$ fly proxy 8080 --app <your-app-name>

to proxy your local port 8080 to port 8080 on a node belonging to your Corrosion app on Fly.io.

As with the fly ssh console command, if you run this command from the directory containing your Corrosion app's fly.toml configuration file, you can omit the --app flag.

Running commands

You don't need a local Corrosion configuration file if you're only using Corrosion to interface with a remote node. You do need to pass an API address to CLI commands. Here's an example using the query subcommand.

$ ./corrosion  query --timer --columns "SELECT * FROM todos" --api-addr "127.0.0.1:8080"
id|title|completed_at
some-id|Write some Corrosion docs!|1234567890
some-id-2|Show how broadcasts work|1234567890
some-id-3|Hello from a template!|1234567890
time: 0.000000249s

API

Each running Corrosion agent hosts a RESTful HTTP API for interacting with the cluster's synchronized database. Endpoints accept SQL statements in a JSON body, for versatility.

Each request is run in a transaction (as if the transaction query param had been passed).

Endpoints:

POST /v1/transactions

Write changes to the Corrosion database for propagation through the cluster. The /v1/transactions endpoint accepts a JSON list of SQL statements.

Sample request

curl http://localhost:8080/v1/transactions \
 -H "content-type: application/json" \
 -d "[\"INSERT OR IGNORE INTO sandwiches (pk, sandwich) VALUES (3, 'brie and cranberry')\"]"

Sample response

{"results":[{"rows_affected":1,"time":0.000027208}],"time":0.000300708}% 

POST /v1/queries

Read from the Corrosion database. The /v1/queries endpoint accepts a single SQL statement in JSON format.

Sample request

curl http://localhost:8080/v1/queries \ 
 -H "content-type: application/json" \
 -d "\"SELECT sandwich FROM sandwiches\""

Sample response

{"columns":["sandwich"]}
{"row":[1,["burger"]]}
{"row":[2,["ham"]]}
{"row":[3,["grilled cheese"]]}
{"row":[4,["brie and cranberry"]]}
{"eoq":{"time":5e-8}}

POST /v1/subscriptions

Start receiving updates for a desired SQL query. The /v1/subscriptions endpoint accepts a single SQL statement in JSON format. The Corrosion agent responds with a Newline Delimited JSON (NDJSON) stream that notifies of any changes to the response to this query.

Request

URL query params

from={change_id} (optional)

If you are re-subscribing, this will start returning events from that point on.

Body

Query statement to subscribe to as a JSON string.

"SELECT sandwich FROM sandwiches"

Accepts SQL params by using an array:

["SELECT sandwich FROM sandwiches WHERE name = ?", ["my-sandwich-name"]]

Example

curl http://localhost:8080/v1/subscriptions \
 -H "content-type: application/json" \
 -d "\"SELECT sandwich FROM sandwiches\""

Response

Headers

Returns a Query ID (UUID) that can be referenced later to re-subscribe.

Example:

corro-query-id: ba247cbc-2a7f-486b-873c-8a9620e72182

Body

Response bodies will contain Newline Delimited JSON (NDJSON) stream of events.

Example:

{ "columns": ["sandwich"] }
{ "row":     [1, ["shiitake"]] }
{ "row":     [2, ["ham"]] }
{ "row":     [3, ["grilled cheese"]] }
{ "row":     [4, ["brie and cranberry"]] }
{ "eoq":     { "time": 8e-8, "change_id": 0 } }
{ "change":  ["update", 2, ["smoked meat"], 1] }
{ "change":  ["update", 1, ["smoked meat"], 2] }
{ "change":  ["update", 2, ["ham"], 3] }
{ "change":  ["update", 1, ["burger"], 4] }
{ "change":  ["update", 2, ["smoked meat"], 5] }
// ...

Event type: columns

Name of all columns returned by the query

{ "columns": ["col_1", "col_2"] }

Event type: row

A tuple as an array of 2 elements containing the query result rowid and all column values as an array.

{ "row": [1, ["cell_1", "cell_2"]] }

Event type: eoq

End Of Query (EOQ) marks the end of the initial query results. Useful for determining when to perform an initial render of a template, for example.

It also includes:

  • Query execution time (not counting iterating all rows, just the actual query)
  • Last change ID recorded for the rows it sent

The latter is useful to resume a subscription stream when you received all rows but never got a change and you don't want to start from 0.

{ "eoq": { "time": 8e-8, "change_id": 0 } }

Event type: change

A wild, new, result for your query appears!

Represented by a tupled as an array of 4 elements:

  1. Type of change (insert, update, delete)
  2. Row ID for the modified record (unique per query)
  3. All values of the columns, even on deletion
  4. Change ID (unique and contiguously increasing per query)

It has been designed this way to make it easy to change single records out of a map of rowid -> record. Allowing users to create memory-efficient reactive interfaces.

With the Change ID, it is possible to pick back up a subscription from an existing point. Useful in disconnection events or restarts of either Corrosion or a client.

{ "change": ["update", 1, ["cell_1", "cell_2"], 1] }
{ "change": ["insert", 2, ["cell_a", "cell_b"], 2] }
{ "change": ["delete", 2, ["cell_a", "cell_b"], 3] }

GET /v1/subscriptions/:id

Subscribe to an already existing query, without prior knowledge of the SQL, knowing the Query ID (UUID).

Request

URL query params

Passing no query parameters will return all previous rows for the query and all future changes.

from={change_id} (optional)

If you are re-subscribing, this will start returning events from that point on.

Examples

curl http://localhost:8080/v1/subscriptions/ba247cbc-2a7f-486b-873c-8a9620e72182
{ "columns": ["sandwich"] }
{ "row":     [1, ["shiitake"]] }
{ "row":     [2, ["ham"]] }
{ "eoq":     { "time": 8e-8, "change_id": 2 } }
curl http://localhost:8080/v1/subscriptions/ba247cbc-2a7f-486b-873c-8a9620e72182?from=1
{ "change": [2, "insert", ["shiitake"], 2] }
{ "change": [3, "insert", ["grilled cheese"], 3] }

Response

Exact same as POST /v1/subscriptions

Client implementation guide

If you can digest Rust, the corro-client crate in Corrosion's repository provides a decent implementation.

Handling errors

Any error-type message received should be considered "fatal" for the client. Some errors cannot be recovered from server-side, in which case it won't be possible to re-subscribe to a subscription.

Buffering data

If your client cannot process rows / changes fast enough, it should buffer them to avoid receiving an error. If any client lags too much, Corrosion will send an error and terminate the request. Sometimes that only leaves the clients a few milliseconds to process a row / change. There's only so much buffering Corrosion will do server-side.

Reconnections and retries

It is encouraged to provide a seamless experience in the event of network errors. By storing the subscription ID and the last obversed change ID, it should be possible to resume subscriptions.

Retrying in a loop w/ a backoff is encouraged, as long as the client gives up after a while and return an error actionable by programs or users.

Usage guide

Reactivity

Mapping data by row ID (often referred to as rowid) is ideal. When receiving changes, they refer to the affected rowid so a consumer can proceed with modifying data with minimal memory usage.

In many cases, it may not be necessary to store each row's cells and instead just a reference to their position in a document or a cheap-to-clone type.

Caveats

Row ordering is not preserved

Root-level ORDER BY won't be honored for changes. Meaning new rows will be out of order relative to previously returned rows. Ordering is only kept for a full set of changes (equivalent to creating a transaction).

"Inner" ordering should work just fine as each query result is re-computed when there are changes. That means if you have a a subquery in your query, its ordering will be honored.

POST /v1/updates/

Stream primary key update notifications for a single table. Unlike /v1/subscriptions, this endpoint does not evaluate a SQL query — it only tells you which primary keys changed in the requested table. This makes it cheap to run and a good fit when you only need to invalidate caches or trigger downstream work, and you'll fetch fresh rows yourself.

The agent responds with a Newline Delimited JSON (NDJSON) stream that stays open for as long as the connection is alive.

Request

URL parameters

table (required, path)

Name of an existing CR-SQLite-enabled table to watch.

The table must already be defined in the cluster's schema. Requests for unknown tables, or tables without primary keys, return an error.

Body

The endpoint takes no body. Send an empty POST.

Example

curl -X POST http://localhost:8080/v1/updates/sandwiches

Response

Body

The response body is an NDJSON stream of events. There is no initial snapshot — events only describe changes that happen after the request is received.

Example:

{ "notify": ["update", ["sandwich-1"]] }
{ "notify": ["update", ["sandwich-2"]] }
{ "notify": ["delete", ["sandwich-1"]] }

Event type: notify

A change occurred for a row in the table. The value is a tuple of two elements:

  1. Type of change: "update" or "delete".
  2. The primary key of the affected row, as an array of column values (in the order the columns appear in the table's primary key).
{ "notify": ["update", ["pk_col_1_value", "pk_col_2_value"]] }

New inserts are reported as "update". The endpoint deliberately surfaces only "the row at this primary key changed" or "the row at this primary key was deleted" — figuring out whether it is a new row is left to the client (typically by checking whether you already had a copy).

Event type: error

A fatal error occurred. The connection will be closed shortly after.

{ "error": "some error message" }

Behavior and caveats

No SQL, no filtering

/v1/updates/{table} only filters by table and doesn't accept sql queries. If you need to receive updates for a particular query, use /v1/subscriptions instead.

Client usage

The corro-client Rust crate exposes this endpoint via CorrosionClient::updates. The same guidance from the subscriptions client guide applies: treat error events as fatal, buffer events client-side if you can't keep up, and retry with backoff on disconnect.

PostgreSQL Wire Protocol v3 API (experimental)

It's possible to configure a PostgreSQL wire protocol compatible API listener via the api.pg.addr setting.

This is currently experimental, but it does work for most queries that are SQLite-flavored SQL.

What works

  • Read and write queries, parsable as SQLite-flavored SQL
  • Most parameter bindings, but not all (work in progress)

Does not work

  • Any PostgreSQL-only SQL syntax
  • Some placement of variable parameters (when binding)

Command-line Interface

Corrosion has a CLI for managing the local Corrosion agent and database. It also provides commands to read from and write to the cluster's database.

The base command is corrosion. Run corrosion --help for a list of subcommands.

See the pages for each subcommand:

The corrosion agent command

Starts Corrosion on the host. The --api-addr, --db-path, and --admin-path options override corresponding settings in the configuration file.

$ corrosion agent --help
Launches the agent

Usage: corrosion agent [OPTIONS]

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: /etc/corrosion/config.toml]
      --api-addr <API_ADDR>      
      --db-path <DB_PATH>        
      --admin-path <ADMIN_PATH>  
  -h, --help                     Print help

The corrosion backup command

Creates a backup of the current database by running VACUUM INTO and cleaning up node-specific data. This includes removing crsql_site_id as well as rewriting __crsql_clock tables to make the backup generic, ready for a corrosion restore.

$ corrosion backup --help
Backup the Corrosion DB

Usage: corrosion backup [OPTIONS] <PATH>

Arguments:
  <PATH>

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: corrosion.toml]
      --api-addr <API_ADDR>
      --db-path <DB_PATH>
      --admin-path <ADMIN_PATH>
  -h, --help                     Print help

The corrosion cluster command

Inspect and manage cluster membership (SWIM / Foca) for the local Corrosion agent.

Subcommands

corrosion cluster rejoin

Forces the node to leave and rejoin the gossip cluster with a **new identity (renewed Foca identity).

corrosion cluster rejoin

corrosion cluster members

Output current members (id, state, and rtt information).

corrosion cluster members

corrosion cluster membership-states

Output SWIM cluster membership states

corrosion cluster membership-states

corrosion cluster set-id <CLUSTER_ID>

Sets a new cluster_id for this node, it command will persist the new cluster_id in the database (__corro_state table with key cluster_id). Remember, corrosion would reject any changes from a node with a different cluster-id.

corrosion cluster set-id 42

The corrosion consul command

Utilities for interacting with Consul.

Subcommands

corrosion consul sync

Starts a long-lived process that synchronizes local consul data to corrosion. It connects to consul using the [consul] block in your config.yaml, and inserts checks and services into the consul_services and consul_checks tables in corrosion. The process expects that these tables already exist on the database with the right schema.

You should add the following sql to your schema:

CREATE TABLE consul_services (
    node TEXT NOT NULL,
    id TEXT NOT NULL,
    name TEXT NOT NULL DEFAULT '',
    tags TEXT NOT NULL DEFAULT '[]',
    meta TEXT NOT NULL DEFAULT '{}',
    port INTEGER NOT NULL DEFAULT 0,
    address TEXT NOT NULL DEFAULT '',
    updated_at INTEGER NOT NULL DEFAULT 0,
    source TEXT, -- corro-consul ignores any row with non-NULL source
    app_id INTEGER AS (CAST(JSON_EXTRACT(meta, '$.app_id') AS INTEGER)),
    network_id INTEGER AS (
        CAST(JSON_EXTRACT(meta, '$.network_id') AS INTEGER)
    ),
    app_name TEXT AS (JSON_EXTRACT(meta, '$.app_name')),
    PRIMARY KEY (node, id)
) WITHOUT ROWID;

CREATE TABLE consul_checks (
    node TEXT NOT NULL,
    id TEXT NOT NULL,
    service_id TEXT NOT NULL DEFAULT '',
    service_name TEXT NOT NULL DEFAULT '',
    name TEXT NOT NULL DEFAULT '',
    status TEXT NOT NULL DEFAULT '',
    output TEXT NOT NULL DEFAULT '',
    updated_at INTEGER NOT NULL DEFAULT 0,
    source TEXT, -- corro-consul ignores any row with non-NULL source
    PRIMARY KEY (node, id)
) WITHOUT ROWID;

Command output

Synchronizes the local consul agent with Corrosion

Usage: corrosion consul sync [OPTIONS]

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: /etc/corrosion/config.toml]
      --api-addr <API_ADDR>      
      --db-path <DB_PATH>        
      --admin-path <ADMIN_PATH>  
  -h, --help         

The corrosion exec command

Writes to Corrosion's database, via the /v1/transactions/ endpoint hosted by the local Corrosion agent.

Use corrosion exec to mutate data within existing CR-SQLite-enabled Corrosion database tables, for propagation throughout the cluster.

Corrosion does not sync schema changes made using this command. Use Corrosion's schema files to create and update the cluster's database schema.

$ corrosion exec --help
Execute a SQL statement that mutates the state of Corrosion

Usage: corrosion exec [OPTIONS] <QUERY>

Arguments:
  <QUERY>  

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: /etc/corrosion/config.toml]
      --timer                    
      --api-addr <API_ADDR>      
      --db-path <DB_PATH>        
      --admin-path <ADMIN_PATH>  
  -h, --help                     Print help

The corrosion query command

Reads from Corrosion's database, via the /v1/queries/ endpoint hosted by the local Corrosion agent.

Use the --columns option to see column headings in the output.

$ corrosion query --help
Query data from Corrosion w/ a SQL statement

Usage: corrosion query [OPTIONS] <QUERY>

Arguments:
  <QUERY>  

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: /etc/corrosion/config.toml]
      --columns                  
      --api-addr <API_ADDR>      
      --timer                    
      --db-path <DB_PATH>        
      --param <PARAM>            
      --admin-path <ADMIN_PATH>  
  -h, --help                     Print help

The corrosion reload command

Reloads Corrosion configuration from a file.

$ corrosion reload --help                             
Reload the config

Usage: corrosion reload [OPTIONS]

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: /etc/corrosion/config.toml]
      --api-addr <API_ADDR>      
      --db-path <DB_PATH>        
      --admin-path <ADMIN_PATH>  
  -h, --help   

The corrosion restore command

Restores a database from a backup produced by corrosion backup. This is an "online restore", it acquires all the appropriate locks on the sqlite3 database so as to not disrupt database readers. It then replaces the database in-place and releases the locks.

$ corrosion restore --help
Restore the Corrosion DB from a backup

Usage: corrosion restore [OPTIONS] <PATH>

Arguments:
  <PATH>

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: corrosion.toml]
      --api-addr <API_ADDR>
      --db-path <DB_PATH>
      --admin-path <ADMIN_PATH>
  -h, --help                     Print help

The corrosion subs command

Command to view subscriptions.

Subcommands

corrosion subs list

Lists all subscriptions registered on this node.

corrosion subs list 

corrosion subs info

View detailed informations on the subscription specified --hash <SUBSCRIPTION_HASH> or --id <UUID>. If --hash is present it wins over --id. You must pass at least one of them; otherwise the command errors.

corrosion subs info --hash abc123... 
corrosion subs info --id 550e8400-e29b-41d4-a716-446655440000 

The corrosion sync command

Debugging commands used to inspect or fix Corrosion's internal bookkeeping state.

Subcommands

corrosion sync generate

Output in-memory bookkeeping state as JSON. This is mostly used for when debugging replication, inspecting what the node sends during sync with other nodes.

corrosion sync generate

corrosion sync reconcile-gaps

This command collapses overlapping gaps (missing versions) in the database and reconciles the in-memory bookie with the data on the database.

corrosion sync reconcile-gaps

corrosion sync check-bookie-consistency

Compares in-memory bookie state with database bookie state for all actors and prints a JSON report to stdout. If the command finds mismatches (ok: false), the command fails with an error summarizing counts of value mismatches and keys only in memory vs only in DB.

corrosion sync check-bookie-consistency

The corrosion template command

Uses Corrosion's template engine to generate and update a local file based on a Rhai script and the latest data from Corrosion.

Specify the name of the .rhai file and the desired name for the output file.

$ corrosion template --help
Usage: corrosion template [OPTIONS] [TEMPLATE]...

Arguments:
  [TEMPLATE]...  

Options:
  -c, --config <CONFIG_PATH>     Set the config file path [default: /etc/corrosion/config.toml]
  -o, --once                     
      --api-addr <API_ADDR>      
      --db-path <DB_PATH>        
      --admin-path <ADMIN_PATH>  
  -h, --help                     Print help

The corrosion tls command

In non-development environment, you'll want to configure [gossip.tls] to secure the transport of information within the cluster.

$ corrosion tls --help
Tls-related commands

Usage: corrosion tls [OPTIONS] <COMMAND>

Commands:
  ca      TLS certificate authority commands
  server  TLS server certificate commands
  client  TLS client certificate commands (for mutual TLS)
  help    Print this message or the help of the given subcommand(s)

corrosion tls ca generate

A CA (Certificate Authority) is necessary to sign server certificates. It's expected for a Corrosion cluster to have a single CA key pair for signing all the nodes' server certificates.

Store the key somewhere secure!

$ corrosion tls ca generate --help
Generate a TLS certificate authority

Usage: corrosion tls ca generate [OPTIONS]

By default, certificates will be output as ca_cert.pem and ca_key.pem in the current directory.

corrosion tls server generate

Generates a server certificate key pair for encrypting peer-to-peer packets. To be used in the gossip.tls configuration block.

The command accepts a <IP> positional argument, it needs to be the IP address your cluster's nodes will use for connecting to the server you're generating the certificates for.

You'll need to have previously generated a CA key pair as it's required to pass --ca-key and --ca-cert flags w/ paths to each PEM file respectively.

$ corrosion tls server generate --help
Generate a TLS server certificate from a CA

Usage: corrosion tls server generate [OPTIONS] --ca-key <CA_KEY> --ca-cert <CA_CERT> <IP>

Arguments:
  <IP>

corrosion tls client generate

Generates a client certificate key pair to authorizing peer-to-peer clients.

You'll need to have previously generated a CA key pair as it's required to pass --ca-key and --ca-cert flags w/ paths to each PEM file respectively.

$ corrosion tls client generate
Generate a TLS certificate from a CA

Usage: corrosion tls client generate [OPTIONS] --ca-key <CA_KEY> --ca-cert <CA_CERT>

Configuration

Corrosion configuration lives in a TOML file. The default location is /etc/corrosion/config.toml.

Configuration sections:

The [db] configuration

The [db] block configures the Corrosion's SQLite database.

Required fields

db.path

Path of the sqlite3 database.

[db]
path = "/var/lib/corrosion/db.sqlite"

Optional fields

db.schema_paths

Array of paths where schema .sql files are present. If a directory is specified, all .sql files within it are loaded.

[db]
schema_paths = ["/etc/corrosion/schema", "/path/to/table_name.sql"]

db.subscriptions_path

Directory where the per-subscription SQLite databases used by /v1/subscriptions live. Each active subscription gets its own database file under this directory.

If unset, defaults to a subscriptions/ directory next to db.path (so /var/lib/corrosion/db.sqlite produces /var/lib/corrosion/subscriptions/).

[db]
subscriptions_path = "/var/lib/corrosion/subscriptions"

db.cache_size_kib

SQLite page cache size for write connections, expressed using SQLite's PRAGMA cache_size convention: a negative value is a number of kibibytes, a positive value is a number of pages.

Defaults to -1048576 (1 GiB).

Larger values can improve write performance noticeably under heavy ingest, at the cost of resident memory. Setting this too low (under ~100 MiB) can severely degrade performance, so prefer leaving it at the default unless you have a measured reason to change it.

[db]
cache_size_kib = -1048576  # 1 GiB

The [gossip] configuration

The [gossip] block configures the peer-to-peer API. Corrosion uses QUIC (UDP) to exchange information between nodes of a cluster.

Required fields

gossip.addr

Socket address reachable from other nodes in the cluster. Listens on UDP for QUIC packets.

Optional fields

gossip.bootstrap

List of node addresses from the cluster for the initial join. Defaults to an empty array.

It's recommended to use a partial list of nodes that overlap. The cluster discover nodes it doesn't know about automatically via SWIM.

Simple example:

bootstrap = ["127.0.0.1:3333", "127.0.0.1:3334"]

It can resolve names (using the system's DNS resolver):

bootstrap = ["my-fly-app.internal:3333"]

It can resolve names w/ a custom DNS server:

bootstrap = ["my-fly-app.internal:3333@[fdaa::3]:53"]

gossip.plaintext

Allows using QUIC without encryption. The only reason to set this to true is if you're running a toy cluster or if the underlying transport is already handling cryptography (such as WireGuard) AND authorization is bound by the network (such is the case for a Fly.io app's private network).

Warning

It's highly recommended to use the gossip.tls configuration block to setup encryption and gossip.tls.client to setup authorization.

gossip.idle_timeout_secs

The max idle timeout in seconds for QUIC connection.

Defaults to 30 seconds.

gossip.member_id

Specifies a member_id which identify nodes of the same Corrosion cluster. Nodes with different member_id would be unable to share changes with each other.

gossip.max_mtu

Define the max MTU for QUIC. Instead of attempting to discover the best MTU value automatically, you can define this upper bound.

This should be your "effective" MTU: network interface's MTU - IP header size - UDP header size. For example, if the MTU on your network interface is 1500 and you're listening on IPv6, you'll need to subtract 40 bytes for the IP header and 8 bytes for the UDP header (you'd set max_mtu = 1452).

gossip.disable_gso

Certain environments don't support GSO (Generic Segmentation Offload). This is detected by the QUIC implementation, but it's possible to pre-emptively disable it to avoid re-trying the initial packets without GSO as it is detected as unavailable.

gossip.tls

Strong encryption is highly recommended for any non-development usage of Corrosion.

You can easily generate the necessary certificates using corrosion tls.

Using gossip.tls.insecure = true means the certificate's signing authority won't be checked.

[gossip.tls] # optional
cert_file = "/path/to/server_cert.pem"
key_file = "/path/to/server_key.pem"
ca_file = "/path/to/ca_cert.pem" # optional
insecure = false # optional

It's also possible to specify client certification authorization (mutual TLS or mTLS):

[gossip.tls.client] # optional
cert_file = "/path/to/client_cert.pem"
key_file = "/path/to/client_key.pem"

Example config (w/ default values)

[gossip]
addr = "" # required, no default value

bootstrap = []

plaintext = false           # optional
idle_timeout_secs = 30      # optional
disable_gso = false         # optional

# max_mtu = 1452            # optional; unset = autodetect, must be >= 1200
# external_addr = ""        # optional, defaults to gossip.addr
# client_addr   = "[::]:0"  # optional

member_id = 1 # optional

[gossip.tls] # optional
cert_file = "/path/to/server_cert.pem"
key_file = "/path/to/server_key.pem"
ca_file = "/path/to/ca_cert.pem" # optional
insecure = false # optional

[gossip.tls.client] # optional
cert_file = "/path/to/client_cert.pem"
key_file = "/path/to/client_key.pem"

The [api] block

The [api] block configures the local Corrosion HTTP API and, optionally, a PostgreSQL wire-protocol listener.

Required fields

api.addr

Address for the Corrosion HTTP API to listen on. Accepts either a single socket address or an array of addresses if you want to listen on multiple interfaces.

addr is an alias for bind_addr; either name works in the config file.

[api]
addr = "0.0.0.0:9000"

api.authz.bearer-token

Optional fields

api.endpoint_name

This is a label used to identify nodes in the same cluster, used mostly to ensure requests aren't processed by the wrong one. An incoming request with a different label in the x-corrosion-requested-endpoint-name header is rejected.

[api]
endpoint_name = "corrosion-iad-1"

api.authz.bearer-token

Bearer token used to authenticate HTTP requests. Clients must set this token in the Authorization header (Authorization: Bearer <token>).

[api]
authz.bearer-token = "<token>"

PostgreSQL wire protocol

Corrosion can additionally expose its database over the PostgreSQL wire protocol for ad-hoc SQL access. The pg field accepts either a single listener config or an array of listener configs.

api.pg.addr

Address to listen on for PostgreSQL connections.

[api]
pg.addr = "127.0.0.1:5470"

Multiple listeners (e.g. one read-write, one read-only):

[[api.pg]]
addr = "127.0.0.1:5470"

[[api.pg]]
addr = "127.0.0.1:5471"
readonly = true

api.pg.readonly

When true, the listener rejects statements that would mutate the database. Defaults to false.

[api.pg]
addr     = "127.0.0.1:5471"
readonly = true

api.pg.tls

Enable TLS for incoming PostgreSQL connections.

[api.pg]
addr = "0.0.0.0:5470"

[api.pg.tls]
cert_file     = "/path/to/server_cert.pem"
key_file      = "/path/to/server_key.pem"
ca_file       = "/path/to/ca_cert.pem"   # optional
verify_client = false                    # optional, set true to require client certs

When verify_client = true, only clients presenting a certificate signed by ca_file will be accepted (mutual TLS).

The [admin] block

The [admin] block configures the Unix domain socket used by the corrosion CLI to send administrative commands to a running agent.

Optional fields

admin.path

Path of the admin Unix socket. Defaults to /var/run/corrosion/admin.sock.

Path for unix socket used to send commands for admin operations.

[admin]
path = "/admin.sock"

The [telemetry] configuration

The telemetry block is optional. This block configures open telemetry and prometheus.

Optional Fields

telemetry.prometheus.addr

Address for the prometheus exporter binds to. GET requests to this address will return prometheus metrics.

[telemetry]
prometheus.addr = "0.0.0.0:9090"

You can read more about the Prometheus metrics that corrosion exposes here.

telemetry.open-telemetry

This block configures how the open telemetry exporter.

Configure open telemetry exporter using environment variables. The environment variables and their default are listed here.

[telemetry]
open-telemetry = "from_env"

telemetry.open-telemetry.exporter

Address that open telemetry exporter should send traces to.

[telemetry]
open-telemetry.exporter = { endpoint = "10.0.0.0:9999"}

The [consul] block

For the corrosion consul sync helper that uses this configuration, see Consul CLI.

consul.client.addr

Local address of consul server.

consul.client.tls

TLS configuration to use when communicating with Consul.

[consul.client.tls]
ca_file = ""
cert_file = ""
key_file = ""

The [reaper] configuration

Corrosion keeps data around in its CR-SQLite bookkeeping tables even when associated rows have been deleted. For tables with many deleted keys that are never re-inserted, this can take up unnecessary space.

The reaper config allows you specify tables so Corrosion will periodically delete cr-sqlite bookkeeping rows (the <table>__crsql_clock and <table>__crsql_pks tables) for primary keys that were deleted long enough ago that you no longer expect them to come back.

When unset, no reaping is performed and bookkeeping rows for deleted primary keys are kept forever.

Warning

Reaping a table is an irreversible cluster-wide operation. If a deleted primary key reappears (e.g. a row is recreated using the same id) after its retention has elapsed, You may end up with inconsistent state across the cluster if there are nodes that are yet to reap that primary key. Only configure the reaper for tables whose primary keys are guaranteed not to be reused, and prefer generous retention windows.

Optional fields

reaper.check_interval

How often (in seconds) the reaper wakes up and scans configured tables. Defaults to 3600 (one hour).

[reaper]
check_interval = 3600

reaper.tables

reaper.tables.<table>.retention (required)

How long after a primary key has been deleted Corrosion should wait before garbage-collecting its cr-sqlite bookkeeping rows. Expressed as a number followed by a unit:

UnitMeaning
sseconds
mminutes
hhours
ddays
wweeks
yyears (365 days)

Examples: "30s", "15m", "24h", "14d", "4w", "1y".

reaper.tables.<table>.match_filter (optional)

SQL WHERE-clause fragment, without the leading WHERE, restricting which deleted primary keys this rule applies to. The fragment is referenced as AND (<match_filter>) in the generated query, so it must be valid in that position and is constrained to columns of the bookkeeping table.

Use this to scope reaping to keys you've explicitly marked as throwaway:

[reaper.tables.sessions]
retention    = "7d"
match_filter = "id LIKE 'tmp-%'"

Example config

[reaper]
check_interval = 3600  # optional, seconds, defaults to 3600

# Reap primary keys of `sessions` 7 days after deletion, but only those whose id was prefixed with `tmp-`.
[reaper.tables.sessions]
retention    = "7d"
match_filter = "id LIKE 'tmp-%'"

# Reap primary key of `audit_log` tables once 90 days after deletion.
[reaper.tables.audit_log]
retention = "90d"

The [perf] configuration

The [perf] block exposes internal tuning knobs for the agent: in-process channel sizes, the WAL truncation threshold, sync backoff bounds, and the change-apply batching strategy.

This is advanced configuration. Most operators don't need to touch any of it — the defaults are tuned for typical Corrosion clusters. Only override these values if you have a specific symptom (e.g. dropped changes under heavy load, runaway WAL growth, slow sync convergence) and a clear hypothesis about which knob will help.

The whole block is optional; omitting it (or any field within it) keeps the default values shown below.

Channel buffer sizes

Corrosion uses bounded in-process channels between its background tasks. These knobs set those channel capacities. Larger channels can absorb bigger bursts before back-pressure kicks in, at the cost of memory; smaller channels are stricter about back-pressure.

If a channel fills up, the affected sender will either block (slowing the producer) or, in a few hot paths, drop the message and log an error. The Prometheus gauge corro_runtime_channel_capacity{channel_name="<name>"} reports the remaining headroom for each channel.

FieldDefaultChannel name (in metrics)What it carries
apply_channel_len2048applyFully-buffered versions that should to be processed.
changes_channel_len1024changesInbound changes received from peers (broadcasts and syncs) waiting to be processed.
bcast_channel_len512bcastLocal writes waiting to be broadcast out to peers.
clearbuf_channel_len512clear_bufFully-processed buffered version ranges that should be cleaned up.
to_send_channel_len512to_sendOutgoing broadcast packets queued for the transport layer.
notifications_channel_len512notificationsSWIM membership notifications (member up / down).
schedule_channel_len512to_scheduleFoca timer schedule events.
foca_channel_len256focaInbound Foca/SWIM protocol events.
[perf]
apply_channel_len         = 2048
changes_channel_len       = 1024
bcast_channel_len         = 512
clearbuf_channel_len      = 512
to_send_channel_len       = 512
notifications_channel_len = 512
schedule_channel_len      = 512
foca_channel_len          = 256

Apply queue (change batching)

Inbound changes are not written to the local database one at a time — they are accumulated and applied in batches inside a single SQLite transaction. These knobs control that batching.

Roughly, the agent waits up to apply_queue_timeout or at least apply_queue_min_batch_size changes to accumulate, then applies the batch. Under sustained load the batch size grows geometrically (controlled by apply_queue_step_base) up to apply_queue_max_batch_size. When the buffered queue passes apply_queue_batch_threshold_ratio of the current target batch size, a batch is spawned immediately rather than waiting for the timer.

processing_queue_len

Maximum number of unapplied changesets the agent will buffer before it starts dropping old entries. Defaults to 20000. A small number of dropped changes isn't worrisome, corrosion will request them through sync. Before tuning this, check first that changes are being processed quickly enough.

[perf]
processing_queue_len = 20000

apply_queue_timeout

Maximum time, in milliseconds, the agent will wait for more changes to accumulate before applying whatever it has. Defaults to 10 ms.

Lower values reduce write latency at the cost of more, smaller transactions. Higher values improve write throughput by amortizing transaction overhead.

apply_queue_min_batch_size

Minimum number of changes the agent will try to apply per transaction. Defaults to 100.

apply_queue_max_batch_size

Maximum number of changes the agent will apply in a single transaction. Defaults to 16000.

apply_queue_step_base

Base used by the geometric batch-size selector. Effective batch size is approximately:

batch_size = clamp(
    apply_queue_min_batch_size,
    apply_queue_step_base * 2 ** floor(log2(queue_len / apply_queue_step_base)),
    apply_queue_max_batch_size,
)

Defaults to 500. Larger values make the batch size grow faster as the queue fills.

apply_queue_batch_threshold_ratio

Fraction (0.01.0) of the current target batch size at which a batch is spawned immediately, without waiting for apply_queue_timeout. Defaults to 0.9.

Raise it (closer to 1.0) to favor larger batches; lower it to favor lower latency.

[perf]
apply_queue_timeout                 = 10      # milliseconds
apply_queue_min_batch_size          = 100
apply_queue_max_batch_size          = 16000
apply_queue_step_base               = 500
apply_queue_batch_threshold_ratio   = 0.9

Synchronization

min_sync_backoff / max_sync_backoff

Lower and upper bound, in seconds, of the randomized backoff between rounds of sync with other peers. Defaults: min_sync_backoff = 1, max_sync_backoff = 15.

A larger range reduces sync chatter on healthy clusters; a smaller range converges faster but produces more cross-node traffic.

[perf]
min_sync_backoff = 1
max_sync_backoff = 15

Database

wal_threshold_mb

Size in megabytes above which the agent will attempt to checkpoint and truncate the SQLite WAL. Defaults to 5120 (5 GiB).

[perf]
wal_threshold_mb = 5120

sql_tx_timeout

Maximum duration, in seconds, that internal SQL transactions (sync ingestion, change apply, etc.) are allowed to run. Defaults to 60. Raising this is useful when applying very large batches on slow storage;

[perf]
sql_tx_timeout = 60