Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cc26c96a82 | |||
| 86a3a8a47c | |||
| 45aea6b702 | |||
| c67b588188 | |||
| 87b016a56c | |||
| 01c42837d1 | |||
| 152fc30116 | |||
| 4c6f77b999 | |||
| 0447284d43 | |||
| d3f5e73dad |
Generated
+63
@@ -345,9 +345,13 @@ dependencies = [
|
||||
name = "db"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"domain",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -361,6 +365,16 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
|
||||
dependencies = [
|
||||
"powerfmt",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -389,6 +403,8 @@ name = "domain"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"time",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -1067,6 +1083,12 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-conv"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.46"
|
||||
@@ -1201,6 +1223,12 @@ dependencies = [
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "powerfmt"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.21"
|
||||
@@ -1763,6 +1791,7 @@ dependencies = [
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
@@ -1847,6 +1876,7 @@ dependencies = [
|
||||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"whoami",
|
||||
@@ -1885,6 +1915,7 @@ dependencies = [
|
||||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"whoami",
|
||||
@@ -1910,6 +1941,7 @@ dependencies = [
|
||||
"serde_urlencoded",
|
||||
"sqlx-core",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tracing",
|
||||
"url",
|
||||
"uuid",
|
||||
@@ -2013,6 +2045,37 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.3.45"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd"
|
||||
dependencies = [
|
||||
"deranged",
|
||||
"itoa",
|
||||
"num-conv",
|
||||
"powerfmt",
|
||||
"serde_core",
|
||||
"time-core",
|
||||
"time-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time-core"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca"
|
||||
|
||||
[[package]]
|
||||
name = "time-macros"
|
||||
version = "0.2.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd"
|
||||
dependencies = [
|
||||
"num-conv",
|
||||
"time-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinystr"
|
||||
version = "0.8.3"
|
||||
|
||||
+2
-1
@@ -9,10 +9,11 @@ rust-version = "1.85"
|
||||
[workspace.dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
axum = "0.8"
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "macros"] }
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "macros", "time", "json"] }
|
||||
uuid = { version = "1", features = ["v4", "serde"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
time = { version = "0.3", features = ["serde"] }
|
||||
clap = { version = "4", features = ["derive", "env"] }
|
||||
utoipa = { version = "5", features = ["uuid"] }
|
||||
anyhow = "1"
|
||||
|
||||
@@ -7,6 +7,10 @@ rust-version.workspace = true
|
||||
[dependencies]
|
||||
sqlx.workspace = true
|
||||
thiserror.workspace = true
|
||||
domain = { path = "../domain" }
|
||||
uuid.workspace = true
|
||||
time.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
-- Append-only audit log. One database == one organization, so there is no org_id.
|
||||
CREATE TABLE audit_log (
|
||||
seq BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||
at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
actor_kind TEXT NOT NULL CHECK (actor_kind IN ('user', 'system')),
|
||||
actor_id UUID,
|
||||
action TEXT NOT NULL CHECK (action IN ('created', 'updated', 'deleted')),
|
||||
entity_type TEXT NOT NULL,
|
||||
entity_id UUID NOT NULL,
|
||||
changes JSONB NOT NULL DEFAULT '[]'::jsonb,
|
||||
CONSTRAINT actor_id_matches_kind CHECK (
|
||||
(actor_kind = 'user' AND actor_id IS NOT NULL) OR
|
||||
(actor_kind = 'system' AND actor_id IS NULL)
|
||||
)
|
||||
);
|
||||
|
||||
CREATE INDEX audit_log_entity_idx ON audit_log (entity_type, entity_id, seq);
|
||||
|
||||
-- Enforce append-only at the database level: reject any UPDATE or DELETE.
|
||||
CREATE OR REPLACE FUNCTION audit_log_reject_mutation() RETURNS trigger AS $$
|
||||
BEGIN
|
||||
RAISE EXCEPTION 'audit_log is append-only; % is not permitted', TG_OP;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER audit_log_immutable
|
||||
BEFORE UPDATE OR DELETE ON audit_log
|
||||
FOR EACH ROW EXECUTE FUNCTION audit_log_reject_mutation();
|
||||
|
||||
CREATE TRIGGER audit_log_no_truncate
|
||||
BEFORE TRUNCATE ON audit_log
|
||||
FOR EACH STATEMENT EXECUTE FUNCTION audit_log_reject_mutation();
|
||||
@@ -0,0 +1,93 @@
|
||||
//! Append-only audit log access.
|
||||
|
||||
use domain::{AuditActor, AuditEntry, FieldChange, NewAuditEvent};
|
||||
use sqlx::Row;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Append an audit event. Accepts any executor, so callers can record the event
|
||||
/// inside the same transaction as the change it describes.
|
||||
pub async fn record<'e, E>(executor: E, event: &NewAuditEvent) -> Result<(), sqlx::Error>
|
||||
where
|
||||
E: sqlx::PgExecutor<'e>,
|
||||
{
|
||||
let (actor_kind, actor_id) = match event.actor {
|
||||
AuditActor::User(id) => ("user", Some(id)),
|
||||
AuditActor::System => ("system", None),
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO audit_log \
|
||||
(actor_kind, actor_id, action, entity_type, entity_id, changes) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6)",
|
||||
)
|
||||
.bind(actor_kind)
|
||||
.bind(actor_id)
|
||||
.bind(event.action.as_str())
|
||||
.bind(&event.entity_type)
|
||||
.bind(event.entity_id)
|
||||
.bind(sqlx::types::Json(&event.changes))
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the full history for one entity, oldest first.
|
||||
pub async fn history_for<'e, E>(
|
||||
executor: E,
|
||||
entity_type: &str,
|
||||
entity_id: Uuid,
|
||||
) -> Result<Vec<AuditEntry>, sqlx::Error>
|
||||
where
|
||||
E: sqlx::PgExecutor<'e>,
|
||||
{
|
||||
// TODO: add LIMIT/keyset pagination before exposing history_for via the API.
|
||||
let rows = sqlx::query(
|
||||
"SELECT seq, at, actor_kind, actor_id, action, entity_type, entity_id, changes \
|
||||
FROM audit_log \
|
||||
WHERE entity_type = $1 AND entity_id = $2 \
|
||||
ORDER BY seq",
|
||||
)
|
||||
.bind(entity_type)
|
||||
.bind(entity_id)
|
||||
.fetch_all(executor)
|
||||
.await?;
|
||||
|
||||
rows.into_iter().map(map_row).collect()
|
||||
}
|
||||
|
||||
fn map_row(row: sqlx::postgres::PgRow) -> Result<AuditEntry, sqlx::Error> {
|
||||
let seq: i64 = row.try_get("seq")?;
|
||||
let at: time::OffsetDateTime = row.try_get("at")?;
|
||||
let actor_kind: String = row.try_get("actor_kind")?;
|
||||
let actor_id: Option<Uuid> = row.try_get("actor_id")?;
|
||||
let action_str: String = row.try_get("action")?;
|
||||
let entity_type: String = row.try_get("entity_type")?;
|
||||
let entity_id: Uuid = row.try_get("entity_id")?;
|
||||
let changes: sqlx::types::Json<Vec<FieldChange>> = row.try_get("changes")?;
|
||||
|
||||
let actor = match actor_kind.as_str() {
|
||||
"user" => AuditActor::User(
|
||||
actor_id.ok_or_else(|| sqlx::Error::Decode("user actor missing actor_id".into()))?,
|
||||
),
|
||||
"system" => AuditActor::System,
|
||||
other => {
|
||||
return Err(sqlx::Error::Decode(
|
||||
format!("unknown actor_kind: {other}").into(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let action = domain::AuditAction::from_db(&action_str)
|
||||
.ok_or_else(|| sqlx::Error::Decode(format!("unknown action: {action_str}").into()))?;
|
||||
|
||||
Ok(AuditEntry {
|
||||
seq,
|
||||
at,
|
||||
actor,
|
||||
action,
|
||||
entity_type,
|
||||
entity_id,
|
||||
changes: changes.0,
|
||||
})
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
//! Database access. All SQL lives in this crate.
|
||||
|
||||
pub mod audit;
|
||||
|
||||
use sqlx::postgres::{PgPool, PgPoolOptions};
|
||||
|
||||
/// A handle to the organization's PostgreSQL database.
|
||||
@@ -37,4 +39,13 @@ impl Db {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply all pending schema migrations (embedded at compile time).
|
||||
///
|
||||
/// Pre-1.0 the migration files are rewritten freely and dev databases are
|
||||
/// recreated; this is the schema-bootstrap mechanism, not forward-migration
|
||||
/// discipline.
|
||||
pub async fn migrate(&self) -> Result<(), sqlx::migrate::MigrateError> {
|
||||
sqlx::migrate!().run(&self.pool).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
use db::Db;
|
||||
use db::audit;
|
||||
use domain::{AuditAction, AuditActor, FieldChange, NewAuditEvent};
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn created(entity_id: Uuid, name: &str) -> NewAuditEvent {
|
||||
NewAuditEvent {
|
||||
actor: AuditActor::System,
|
||||
action: AuditAction::Created,
|
||||
entity_type: "object".into(),
|
||||
entity_id,
|
||||
changes: vec![FieldChange {
|
||||
field: "name".into(),
|
||||
before: None,
|
||||
after: Some(json!(name)),
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn records_and_reads_back_history_in_order(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
let id = Uuid::new_v4();
|
||||
let user = Uuid::new_v4();
|
||||
|
||||
audit::record(db.pool(), &created(id, "Vase"))
|
||||
.await
|
||||
.unwrap();
|
||||
audit::record(
|
||||
db.pool(),
|
||||
&NewAuditEvent {
|
||||
actor: AuditActor::User(user),
|
||||
action: AuditAction::Updated,
|
||||
entity_type: "object".into(),
|
||||
entity_id: id,
|
||||
changes: vec![FieldChange {
|
||||
field: "name".into(),
|
||||
before: Some(json!("Vase")),
|
||||
after: Some(json!("Roman Vase")),
|
||||
}],
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let history = audit::history_for(db.pool(), "object", id).await.unwrap();
|
||||
assert_eq!(history.len(), 2);
|
||||
assert_eq!(history[0].action, AuditAction::Created);
|
||||
assert_eq!(history[0].actor, AuditActor::System);
|
||||
assert_eq!(history[1].action, AuditAction::Updated);
|
||||
assert_eq!(history[1].actor, AuditActor::User(user));
|
||||
assert!(history[0].seq < history[1].seq, "ordered by seq");
|
||||
assert_eq!(history[1].changes[0].field, "name");
|
||||
assert_eq!(history[1].changes[0].after, Some(json!("Roman Vase")));
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn history_is_scoped_to_one_entity(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
let a = Uuid::new_v4();
|
||||
let b = Uuid::new_v4();
|
||||
audit::record(db.pool(), &created(a, "A")).await.unwrap();
|
||||
audit::record(db.pool(), &created(b, "B")).await.unwrap();
|
||||
|
||||
let only_a = audit::history_for(db.pool(), "object", a).await.unwrap();
|
||||
assert_eq!(only_a.len(), 1);
|
||||
assert_eq!(only_a[0].entity_id, a);
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn deleted_action_with_empty_changes_round_trips(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
let id = Uuid::new_v4();
|
||||
|
||||
audit::record(
|
||||
db.pool(),
|
||||
&NewAuditEvent {
|
||||
actor: AuditActor::System,
|
||||
action: AuditAction::Deleted,
|
||||
entity_type: "object".into(),
|
||||
entity_id: id,
|
||||
changes: vec![],
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let history = audit::history_for(db.pool(), "object", id).await.unwrap();
|
||||
assert_eq!(history.len(), 1);
|
||||
assert_eq!(history[0].action, AuditAction::Deleted);
|
||||
assert!(history[0].changes.is_empty());
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn history_is_empty_for_unknown_entity(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
let history = audit::history_for(db.pool(), "object", Uuid::new_v4())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(history.is_empty());
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
use db::Db;
|
||||
use db::audit;
|
||||
use domain::{AuditAction, AuditActor, NewAuditEvent};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn sample() -> NewAuditEvent {
|
||||
NewAuditEvent {
|
||||
actor: AuditActor::System,
|
||||
action: AuditAction::Created,
|
||||
entity_type: "object".into(),
|
||||
entity_id: Uuid::new_v4(),
|
||||
changes: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
async fn count(pool: &PgPool) -> i64 {
|
||||
sqlx::query_scalar("SELECT count(*) FROM audit_log")
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn update_delete_truncate_are_rejected(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
audit::record(db.pool(), &sample()).await.unwrap();
|
||||
|
||||
// Each failing statement poisons its connection (Postgres enters aborted-transaction
|
||||
// state). Acquire a fresh connection per statement so later assertions are independent.
|
||||
let update_err = sqlx::query("UPDATE audit_log SET action = 'deleted'")
|
||||
.execute(db.pool())
|
||||
.await
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
|
||||
assert!(
|
||||
update_err.contains("audit_log is append-only"),
|
||||
"UPDATE must be rejected by the trigger, got: {update_err}"
|
||||
);
|
||||
|
||||
let delete_err = sqlx::query("DELETE FROM audit_log")
|
||||
.execute(db.pool())
|
||||
.await
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
|
||||
assert!(
|
||||
delete_err.contains("audit_log is append-only"),
|
||||
"DELETE must be rejected by the trigger, got: {delete_err}"
|
||||
);
|
||||
|
||||
let truncate_err = sqlx::query("TRUNCATE audit_log")
|
||||
.execute(db.pool())
|
||||
.await
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
|
||||
assert!(
|
||||
truncate_err.contains("audit_log is append-only"),
|
||||
"TRUNCATE must be rejected by the trigger, got: {truncate_err}"
|
||||
);
|
||||
|
||||
assert_eq!(count(db.pool()).await, 1, "the row is still there");
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn record_rolls_back_with_caller_transaction(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
let mut tx = db.pool().begin().await.unwrap();
|
||||
|
||||
audit::record(&mut *tx, &sample()).await.unwrap();
|
||||
|
||||
tx.rollback().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
count(db.pool()).await,
|
||||
0,
|
||||
"a rolled-back audit record must not persist"
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn record_commits_with_caller_transaction(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
let mut tx = db.pool().begin().await.unwrap();
|
||||
|
||||
audit::record(&mut *tx, &sample()).await.unwrap();
|
||||
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
count(db.pool()).await,
|
||||
1,
|
||||
"a committed audit record persists"
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
use db::Db;
|
||||
use sqlx::PgPool;
|
||||
|
||||
#[sqlx::test]
|
||||
async fn migrate_is_idempotent_and_creates_audit_log(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
// sqlx::test already applied migrations to this temp DB; re-running must be a
|
||||
// no-op success (idempotent).
|
||||
db.migrate()
|
||||
.await
|
||||
.expect("re-running migrate is idempotent");
|
||||
|
||||
let regclass: Option<String> =
|
||||
sqlx::query_scalar("SELECT to_regclass('public.audit_log')::text")
|
||||
.fetch_one(db.pool())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(regclass.as_deref(), Some("audit_log"));
|
||||
}
|
||||
@@ -7,3 +7,5 @@ rust-version.workspace = true
|
||||
[dependencies]
|
||||
uuid.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
time.workspace = true
|
||||
|
||||
@@ -0,0 +1,147 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// What kind of change an audit entry records.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum AuditAction {
|
||||
Created,
|
||||
Updated,
|
||||
Deleted,
|
||||
}
|
||||
|
||||
impl AuditAction {
|
||||
/// The database/text representation.
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
AuditAction::Created => "created",
|
||||
AuditAction::Updated => "updated",
|
||||
AuditAction::Deleted => "deleted",
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse from the database/text representation.
|
||||
pub fn from_db(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"created" => Some(AuditAction::Created),
|
||||
"updated" => Some(AuditAction::Updated),
|
||||
"deleted" => Some(AuditAction::Deleted),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Who performed the change.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case", tag = "kind", content = "id")]
|
||||
pub enum AuditActor {
|
||||
/// A specific user, referenced by id (a `UserId` newtype arrives with auth).
|
||||
User(Uuid),
|
||||
/// The system itself (migrations, automated processes).
|
||||
System,
|
||||
}
|
||||
|
||||
/// One field's before/after values within a change.
|
||||
///
|
||||
/// Note: after a JSON round-trip, `Some(Value::Null)` is indistinguishable from
|
||||
/// `None`. Use `None` to mean "no value"; do not encode an absent value as
|
||||
/// `Some(Value::Null)`.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct FieldChange {
|
||||
/// Field name (catalogue field key or column name).
|
||||
pub field: String,
|
||||
/// Value before the change (None when newly set).
|
||||
pub before: Option<Value>,
|
||||
/// Value after the change (None when cleared).
|
||||
pub after: Option<Value>,
|
||||
}
|
||||
|
||||
/// An audit event to be recorded.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct NewAuditEvent {
|
||||
pub actor: AuditActor,
|
||||
pub action: AuditAction,
|
||||
pub entity_type: String,
|
||||
pub entity_id: Uuid,
|
||||
pub changes: Vec<FieldChange>,
|
||||
}
|
||||
|
||||
/// A recorded audit entry, read back from the log.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AuditEntry {
|
||||
/// Monotonic sequence number (insertion order).
|
||||
pub seq: i64,
|
||||
/// When it was recorded.
|
||||
pub at: OffsetDateTime,
|
||||
pub actor: AuditActor,
|
||||
pub action: AuditAction,
|
||||
pub entity_type: String,
|
||||
pub entity_id: Uuid,
|
||||
pub changes: Vec<FieldChange>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn action_round_trips_via_db_string() {
|
||||
for a in [
|
||||
AuditAction::Created,
|
||||
AuditAction::Updated,
|
||||
AuditAction::Deleted,
|
||||
] {
|
||||
assert_eq!(AuditAction::from_db(a.as_str()), Some(a));
|
||||
}
|
||||
assert_eq!(AuditAction::from_db("bogus"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_change_serde_round_trip() {
|
||||
let fc = FieldChange {
|
||||
field: "name".into(),
|
||||
before: Some(json!("Vase")),
|
||||
after: Some(json!("Roman Vase")),
|
||||
};
|
||||
let v = serde_json::to_value(&fc).unwrap();
|
||||
assert_eq!(v["field"], "name");
|
||||
assert_eq!(v["before"], "Vase");
|
||||
assert_eq!(v["after"], "Roman Vase");
|
||||
let back: FieldChange = serde_json::from_value(v).unwrap();
|
||||
assert_eq!(back, fc);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn actor_serde_round_trips() {
|
||||
for actor in [AuditActor::User(Uuid::nil()), AuditActor::System] {
|
||||
let v = serde_json::to_value(actor).unwrap();
|
||||
let back: AuditActor = serde_json::from_value(v).unwrap();
|
||||
assert_eq!(back, actor);
|
||||
}
|
||||
assert_eq!(
|
||||
serde_json::to_value(AuditActor::User(Uuid::nil())).unwrap()["kind"],
|
||||
"user"
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(AuditActor::System).unwrap()["kind"],
|
||||
"system"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn action_serde_matches_as_str() {
|
||||
for a in [
|
||||
AuditAction::Created,
|
||||
AuditAction::Updated,
|
||||
AuditAction::Deleted,
|
||||
] {
|
||||
assert_eq!(
|
||||
serde_json::to_value(a).unwrap(),
|
||||
serde_json::Value::String(a.as_str().to_owned())
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
//! Core domain types and invariants. No I/O dependencies.
|
||||
|
||||
mod audit;
|
||||
mod id;
|
||||
|
||||
pub use audit::{AuditAction, AuditActor, AuditEntry, FieldChange, NewAuditEvent};
|
||||
pub use id::OrgId;
|
||||
|
||||
@@ -14,6 +14,9 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
|
||||
let db = Db::connect(&config.database_url)
|
||||
.await
|
||||
.context("connecting to the database")?;
|
||||
|
||||
db.migrate().await.context("running database migrations")?;
|
||||
|
||||
let state = AppState {
|
||||
db,
|
||||
app_name: config.app_name.clone(),
|
||||
|
||||
@@ -0,0 +1,628 @@
|
||||
# Audit Spine Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Build the append-only, immutable audit log — recording who/when/what with field-level before→after diffs — that every later write path will call to satisfy Spectrum "amendment history" (`docs/specs/2026-06-02-mvp-architecture.md` §13).
|
||||
|
||||
**Architecture:** Audit value types live in `domain` (pure, no I/O). The `db` crate owns the `audit_log` table (via a schema-bootstrap migration) and a transaction-capable `audit` repository (`record` / `history_for`). Immutability is enforced *in the database* by a trigger that rejects UPDATE/DELETE — infrastructure-enforced, not convention. There is **no `org_id`** column: each deployment's database *is* one organization (§3/§4). No HTTP surface yet — the spine is consumed by future write paths; an audit/history API arrives when entities do.
|
||||
|
||||
**Tech Stack:** Rust 2024, sqlx 0.8 (Postgres, +`time` +`json` features), `time` for timestamps, `serde_json` for the JSONB change payload. Tests use `#[sqlx::test]` (auto-applies the migration to a fresh temp DB).
|
||||
|
||||
---
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- A PostgreSQL reachable for tests where the role may CREATE DATABASE. Bring it up with the project compose (`docker compose up -d`) and export `DATABASE_URL`. In a host where 5432 is taken, run an isolated instance, e.g.:
|
||||
`docker run -d --name cms-test-pg -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=cms_dev -p 5433:5432 postgres:17` and use `DATABASE_URL=postgres://postgres:postgres@localhost:5433/cms_dev`.
|
||||
- Shell env does NOT persist between commands; pass `DATABASE_URL` inline on every test/clippy command.
|
||||
- Verify crate versions with the cratesio tooling before pinning new ones.
|
||||
|
||||
## Design decisions (review these)
|
||||
|
||||
1. **Schema bootstrap pre-1.0.** The schema lives as sqlx migration files under **`crates/db/migrations/`** (SQL belongs with the `db` crate). `#[sqlx::test]` auto-applies them to each temp DB; the server applies them on startup via `Db::migrate()` (`sqlx::migrate!()` embeds them at compile time). Per spec §8/D15 we are **not** maintaining forward-only migration history yet — pre-1.0 we **rewrite these files freely and recreate dev databases** (drop & re-apply) rather than writing incremental migrations. At 1.0 we freeze and switch to disciplined migrations. *(This refines the spec's "recreate, don't migrate" into a concrete mechanism — fold it back into the spec.)*
|
||||
2. **Immutability in the database.** A `BEFORE UPDATE OR DELETE` trigger on `audit_log` raises an exception, so append-only is enforced by Postgres, not by "we only wrote an insert function." Matches the infrastructure-enforced philosophy (§4).
|
||||
3. **No `org_id`.** Single-tenant database per deployment; the DB is the org boundary.
|
||||
4. **Actor model.** `AuditActor = User(Uuid) | System`. No `User` entity exists yet, so the user is referenced by raw `Uuid`; auth (Plan 9) will introduce a `UserId` newtype that maps onto this. Auth *events* (login success/failure) are deferred to Plan 9 — this spine covers entity-change events (`created`/`updated`/`deleted`).
|
||||
5. **Transaction-capable `record`.** `record` takes an `impl sqlx::PgExecutor`, so a future write path can record the audit entry **inside the same transaction** as the entity change (atomic: both commit or both roll back).
|
||||
6. **`domain` gets wired in.** `db` depends on `domain` for the audit types — this lands the "everything points inward to `domain`" relationship that was aspirational after Plan 0 (issue #4).
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
Cargo.toml + time dep; sqlx +time +json features
|
||||
crates/domain/
|
||||
Cargo.toml + serde_json, time
|
||||
src/lib.rs re-export audit types
|
||||
src/audit.rs AuditAction, AuditActor, FieldChange, NewAuditEvent, AuditEntry (+ unit tests)
|
||||
crates/db/
|
||||
Cargo.toml + domain, uuid, time
|
||||
migrations/0001_audit_log.sql audit_log table + immutability trigger + index
|
||||
src/lib.rs + pub mod audit; + Db::migrate()
|
||||
src/audit.rs record() + history_for() (transaction-capable)
|
||||
tests/migrate.rs migrate idempotent + table exists
|
||||
tests/audit.rs record/read-back, ordering, entity isolation
|
||||
tests/audit_immutability.rs UPDATE/DELETE rejected; rolled-back tx leaves nothing
|
||||
crates/server/
|
||||
src/lib.rs run() applies migrations on startup
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 1: `domain` — audit value types
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/domain/Cargo.toml`
|
||||
- Create: `crates/domain/src/audit.rs`
|
||||
- Modify: `crates/domain/src/lib.rs`
|
||||
|
||||
- [ ] **Step 1: Add dependencies.** In `Cargo.toml` (workspace root) add to `[workspace.dependencies]` a `time` entry (verify latest 0.3.x):
|
||||
```toml
|
||||
time = { version = "0.3", features = ["serde"] }
|
||||
```
|
||||
Then in `crates/domain/Cargo.toml`, set `[dependencies]` to:
|
||||
```toml
|
||||
[dependencies]
|
||||
uuid.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
time.workspace = true
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Write the failing test + types.** Create `crates/domain/src/audit.rs`:
|
||||
```rust
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// What kind of change an audit entry records.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum AuditAction {
|
||||
Created,
|
||||
Updated,
|
||||
Deleted,
|
||||
}
|
||||
|
||||
impl AuditAction {
|
||||
/// The database/text representation.
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
AuditAction::Created => "created",
|
||||
AuditAction::Updated => "updated",
|
||||
AuditAction::Deleted => "deleted",
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse from the database/text representation.
|
||||
pub fn from_db(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"created" => Some(AuditAction::Created),
|
||||
"updated" => Some(AuditAction::Updated),
|
||||
"deleted" => Some(AuditAction::Deleted),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Who performed the change.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case", tag = "kind", content = "id")]
|
||||
pub enum AuditActor {
|
||||
/// A specific user, referenced by id (a `UserId` newtype arrives with auth).
|
||||
User(Uuid),
|
||||
/// The system itself (migrations, automated processes).
|
||||
System,
|
||||
}
|
||||
|
||||
/// One field's before/after values within a change.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct FieldChange {
|
||||
/// Field name (catalogue field key or column name).
|
||||
pub field: String,
|
||||
/// Value before the change (None when newly set).
|
||||
pub before: Option<Value>,
|
||||
/// Value after the change (None when cleared).
|
||||
pub after: Option<Value>,
|
||||
}
|
||||
|
||||
/// An audit event to be recorded.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct NewAuditEvent {
|
||||
pub actor: AuditActor,
|
||||
pub action: AuditAction,
|
||||
pub entity_type: String,
|
||||
pub entity_id: Uuid,
|
||||
pub changes: Vec<FieldChange>,
|
||||
}
|
||||
|
||||
/// A recorded audit entry, read back from the log.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AuditEntry {
|
||||
/// Monotonic sequence number (insertion order).
|
||||
pub seq: i64,
|
||||
/// When it was recorded.
|
||||
pub at: OffsetDateTime,
|
||||
pub actor: AuditActor,
|
||||
pub action: AuditAction,
|
||||
pub entity_type: String,
|
||||
pub entity_id: Uuid,
|
||||
pub changes: Vec<FieldChange>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn action_round_trips_via_db_string() {
|
||||
for a in [AuditAction::Created, AuditAction::Updated, AuditAction::Deleted] {
|
||||
assert_eq!(AuditAction::from_db(a.as_str()), Some(a));
|
||||
}
|
||||
assert_eq!(AuditAction::from_db("bogus"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_change_serde_round_trip() {
|
||||
let fc = FieldChange {
|
||||
field: "name".into(),
|
||||
before: Some(json!("Vase")),
|
||||
after: Some(json!("Roman Vase")),
|
||||
};
|
||||
let v = serde_json::to_value(&fc).unwrap();
|
||||
assert_eq!(v["field"], "name");
|
||||
assert_eq!(v["before"], "Vase");
|
||||
assert_eq!(v["after"], "Roman Vase");
|
||||
let back: FieldChange = serde_json::from_value(v).unwrap();
|
||||
assert_eq!(back, fc);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn actor_is_adjacently_tagged() {
|
||||
let v = serde_json::to_value(AuditActor::User(Uuid::nil())).unwrap();
|
||||
assert_eq!(v["kind"], "user");
|
||||
let v2 = serde_json::to_value(AuditActor::System).unwrap();
|
||||
assert_eq!(v2["kind"], "system");
|
||||
}
|
||||
}
|
||||
```
|
||||
Wire into `crates/domain/src/lib.rs` (keep the existing `mod id; pub use id::OrgId;`), adding:
|
||||
```rust
|
||||
mod audit;
|
||||
|
||||
pub use audit::{AuditAction, AuditActor, AuditEntry, FieldChange, NewAuditEvent};
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Run the tests to verify they fail, then pass.** First confirm the test file compiles and fails if you stub the types out — but since the types and tests are added together here, run:
|
||||
`cargo test -p domain`
|
||||
Expected: PASS — the three new audit tests plus the two existing `id` tests (5 total). If it fails to compile, fix the types until green. (TDD note: the assertions encode the intended behavior — `as_str`/`from_db` inverse, serde shapes — so a regression in those will fail.)
|
||||
|
||||
- [ ] **Step 4: Lint + format.** `cargo +nightly fmt` and `cargo clippy -p domain --all-targets -- -D warnings` → clean.
|
||||
|
||||
- [ ] **Step 5: Commit.**
|
||||
```bash
|
||||
git add Cargo.toml crates/domain
|
||||
git commit -m "feat(domain): add audit value types"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Schema bootstrap + `audit_log` table
|
||||
|
||||
**Files:**
|
||||
- Modify: `Cargo.toml` (sqlx features)
|
||||
- Create: `crates/db/migrations/0001_audit_log.sql`
|
||||
- Modify: `crates/db/src/lib.rs`
|
||||
- Modify: `crates/server/src/lib.rs`
|
||||
- Test: `crates/db/tests/migrate.rs`
|
||||
|
||||
- [ ] **Step 1: Enable sqlx `time` + `json` features.** In root `Cargo.toml`, update the sqlx workspace dependency features to include `time` and `json`:
|
||||
```toml
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid", "macros", "time", "json"] }
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Write the migration (schema + immutability trigger).** Create `crates/db/migrations/0001_audit_log.sql`:
|
||||
```sql
|
||||
-- Append-only audit log. One database == one organization, so there is no org_id.
|
||||
CREATE TABLE audit_log (
|
||||
seq BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||
at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
actor_kind TEXT NOT NULL CHECK (actor_kind IN ('user', 'system')),
|
||||
actor_id UUID,
|
||||
action TEXT NOT NULL CHECK (action IN ('created', 'updated', 'deleted')),
|
||||
entity_type TEXT NOT NULL,
|
||||
entity_id UUID NOT NULL,
|
||||
changes JSONB NOT NULL DEFAULT '[]'::jsonb,
|
||||
CONSTRAINT actor_id_matches_kind CHECK (
|
||||
(actor_kind = 'user' AND actor_id IS NOT NULL) OR
|
||||
(actor_kind = 'system' AND actor_id IS NULL)
|
||||
)
|
||||
);
|
||||
|
||||
CREATE INDEX audit_log_entity_idx ON audit_log (entity_type, entity_id, seq);
|
||||
|
||||
-- Enforce append-only at the database level: reject any UPDATE or DELETE.
|
||||
CREATE OR REPLACE FUNCTION audit_log_reject_mutation() RETURNS trigger AS $$
|
||||
BEGIN
|
||||
RAISE EXCEPTION 'audit_log is append-only; % is not permitted', TG_OP;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER audit_log_immutable
|
||||
BEFORE UPDATE OR DELETE ON audit_log
|
||||
FOR EACH ROW EXECUTE FUNCTION audit_log_reject_mutation();
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add `Db::migrate()`.** In `crates/db/src/lib.rs`, add this method to the `impl Db` block (after `ping`):
|
||||
```rust
|
||||
/// Apply all pending schema migrations (embedded at compile time).
|
||||
///
|
||||
/// Pre-1.0 the migration files are rewritten freely and dev databases are
|
||||
/// recreated; this is the schema-bootstrap mechanism, not forward-migration
|
||||
/// discipline.
|
||||
pub async fn migrate(&self) -> Result<(), sqlx::migrate::MigrateError> {
|
||||
sqlx::migrate!().run(&self.pool).await
|
||||
}
|
||||
```
|
||||
(`sqlx::migrate!()` defaults to `./migrations` relative to the `db` crate, i.e. `crates/db/migrations`.)
|
||||
|
||||
- [ ] **Step 4: Apply migrations on server startup.** In `crates/server/src/lib.rs`, inside `run`, immediately after the `Db::connect(...)?` line and before building `AppState`, add:
|
||||
```rust
|
||||
db.migrate().await.context("running database migrations")?;
|
||||
```
|
||||
(`anyhow::Context` is already imported; `MigrateError` implements `std::error::Error`, so `.context(...)?` works.)
|
||||
|
||||
- [ ] **Step 5: Write the migrate test.** Create `crates/db/tests/migrate.rs`:
|
||||
```rust
|
||||
use db::Db;
|
||||
use sqlx::PgPool;
|
||||
|
||||
#[sqlx::test]
|
||||
async fn migrate_is_idempotent_and_creates_audit_log(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
// sqlx::test already applied migrations to this temp DB; re-running must be a
|
||||
// no-op success (idempotent).
|
||||
db.migrate().await.expect("re-running migrate is idempotent");
|
||||
|
||||
let regclass: Option<String> =
|
||||
sqlx::query_scalar("SELECT to_regclass('public.audit_log')::text")
|
||||
.fetch_one(db.pool())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(regclass.as_deref(), Some("audit_log"));
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Run it.** `DATABASE_URL=<url> cargo test -p db --test migrate` → PASS (1 test).
|
||||
|
||||
- [ ] **Step 7: Lint + format.** `cargo +nightly fmt` and `DATABASE_URL=<url> cargo clippy -p db -p server --all-targets -- -D warnings` → clean.
|
||||
|
||||
- [ ] **Step 8: Commit.**
|
||||
```bash
|
||||
git add Cargo.toml crates/db crates/server
|
||||
git commit -m "feat(db): schema bootstrap with append-only audit_log table"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: `db::audit` repository — record & read history
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/db/Cargo.toml`
|
||||
- Create: `crates/db/src/audit.rs`
|
||||
- Modify: `crates/db/src/lib.rs`
|
||||
- Test: `crates/db/tests/audit.rs`
|
||||
|
||||
- [ ] **Step 1: Add dependencies.** In `crates/db/Cargo.toml`, set:
|
||||
```toml
|
||||
[dependencies]
|
||||
sqlx.workspace = true
|
||||
thiserror.workspace = true
|
||||
domain = { path = "../domain" }
|
||||
uuid.workspace = true
|
||||
time.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
serde_json.workspace = true
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Write the failing test.** Create `crates/db/tests/audit.rs`:
|
||||
```rust
|
||||
use db::Db;
|
||||
use db::audit;
|
||||
use domain::{AuditAction, AuditActor, FieldChange, NewAuditEvent};
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn created(entity_id: Uuid, name: &str) -> NewAuditEvent {
|
||||
NewAuditEvent {
|
||||
actor: AuditActor::System,
|
||||
action: AuditAction::Created,
|
||||
entity_type: "object".into(),
|
||||
entity_id,
|
||||
changes: vec![FieldChange {
|
||||
field: "name".into(),
|
||||
before: None,
|
||||
after: Some(json!(name)),
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn records_and_reads_back_history_in_order(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
let id = Uuid::new_v4();
|
||||
let user = Uuid::new_v4();
|
||||
|
||||
audit::record(db.pool(), &created(id, "Vase")).await.unwrap();
|
||||
audit::record(
|
||||
db.pool(),
|
||||
&NewAuditEvent {
|
||||
actor: AuditActor::User(user),
|
||||
action: AuditAction::Updated,
|
||||
entity_type: "object".into(),
|
||||
entity_id: id,
|
||||
changes: vec![FieldChange {
|
||||
field: "name".into(),
|
||||
before: Some(json!("Vase")),
|
||||
after: Some(json!("Roman Vase")),
|
||||
}],
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let history = audit::history_for(db.pool(), "object", id).await.unwrap();
|
||||
assert_eq!(history.len(), 2);
|
||||
assert_eq!(history[0].action, AuditAction::Created);
|
||||
assert_eq!(history[0].actor, AuditActor::System);
|
||||
assert_eq!(history[1].action, AuditAction::Updated);
|
||||
assert_eq!(history[1].actor, AuditActor::User(user));
|
||||
assert!(history[0].seq < history[1].seq, "ordered by seq");
|
||||
assert_eq!(history[1].changes[0].field, "name");
|
||||
assert_eq!(history[1].changes[0].after, Some(json!("Roman Vase")));
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn history_is_scoped_to_one_entity(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
let a = Uuid::new_v4();
|
||||
let b = Uuid::new_v4();
|
||||
audit::record(db.pool(), &created(a, "A")).await.unwrap();
|
||||
audit::record(db.pool(), &created(b, "B")).await.unwrap();
|
||||
|
||||
let only_a = audit::history_for(db.pool(), "object", a).await.unwrap();
|
||||
assert_eq!(only_a.len(), 1);
|
||||
assert_eq!(only_a[0].entity_id, a);
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Run it to verify it fails.** `DATABASE_URL=<url> cargo test -p db --test audit` → FAIL (`db::audit` / `record` / `history_for` don't exist).
|
||||
|
||||
- [ ] **Step 4: Implement the repository.** Create `crates/db/src/audit.rs`:
|
||||
```rust
|
||||
//! Append-only audit log access.
|
||||
|
||||
use domain::{AuditActor, AuditEntry, FieldChange, NewAuditEvent};
|
||||
use sqlx::Row;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Append an audit event. Accepts any executor, so callers can record the event
|
||||
/// inside the same transaction as the change it describes.
|
||||
pub async fn record<'e, E>(executor: E, event: &NewAuditEvent) -> Result<(), sqlx::Error>
|
||||
where
|
||||
E: sqlx::PgExecutor<'e>,
|
||||
{
|
||||
let (actor_kind, actor_id) = match event.actor {
|
||||
AuditActor::User(id) => ("user", Some(id)),
|
||||
AuditActor::System => ("system", None),
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO audit_log \
|
||||
(actor_kind, actor_id, action, entity_type, entity_id, changes) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6)",
|
||||
)
|
||||
.bind(actor_kind)
|
||||
.bind(actor_id)
|
||||
.bind(event.action.as_str())
|
||||
.bind(&event.entity_type)
|
||||
.bind(event.entity_id)
|
||||
.bind(sqlx::types::Json(&event.changes))
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the full history for one entity, oldest first.
|
||||
pub async fn history_for<'e, E>(
|
||||
executor: E,
|
||||
entity_type: &str,
|
||||
entity_id: Uuid,
|
||||
) -> Result<Vec<AuditEntry>, sqlx::Error>
|
||||
where
|
||||
E: sqlx::PgExecutor<'e>,
|
||||
{
|
||||
let rows = sqlx::query(
|
||||
"SELECT seq, at, actor_kind, actor_id, action, entity_type, entity_id, changes \
|
||||
FROM audit_log \
|
||||
WHERE entity_type = $1 AND entity_id = $2 \
|
||||
ORDER BY seq",
|
||||
)
|
||||
.bind(entity_type)
|
||||
.bind(entity_id)
|
||||
.fetch_all(executor)
|
||||
.await?;
|
||||
|
||||
rows.into_iter().map(map_row).collect()
|
||||
}
|
||||
|
||||
fn map_row(row: sqlx::postgres::PgRow) -> Result<AuditEntry, sqlx::Error> {
|
||||
let seq: i64 = row.try_get("seq")?;
|
||||
let at: time::OffsetDateTime = row.try_get("at")?;
|
||||
let actor_kind: String = row.try_get("actor_kind")?;
|
||||
let actor_id: Option<Uuid> = row.try_get("actor_id")?;
|
||||
let action: String = row.try_get("action")?;
|
||||
let entity_type: String = row.try_get("entity_type")?;
|
||||
let entity_id: Uuid = row.try_get("entity_id")?;
|
||||
let changes: sqlx::types::Json<Vec<FieldChange>> = row.try_get("changes")?;
|
||||
|
||||
let actor = match actor_kind.as_str() {
|
||||
"user" => AuditActor::User(
|
||||
actor_id.ok_or_else(|| sqlx::Error::Decode("user actor missing actor_id".into()))?,
|
||||
),
|
||||
"system" => AuditActor::System,
|
||||
other => {
|
||||
return Err(sqlx::Error::Decode(
|
||||
format!("unknown actor_kind: {other}").into(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let action = domain::AuditAction::from_db(&action)
|
||||
.ok_or_else(|| sqlx::Error::Decode(format!("unknown action: {action}").into()))?;
|
||||
|
||||
Ok(AuditEntry {
|
||||
seq,
|
||||
at,
|
||||
actor,
|
||||
action,
|
||||
entity_type,
|
||||
entity_id,
|
||||
changes: changes.0,
|
||||
})
|
||||
}
|
||||
```
|
||||
Add to `crates/db/src/lib.rs` (top-level, after the module doc comment):
|
||||
```rust
|
||||
pub mod audit;
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Run it to verify it passes.** `DATABASE_URL=<url> cargo test -p db --test audit` → PASS (2 tests).
|
||||
|
||||
- [ ] **Step 6: Lint + format.** `cargo +nightly fmt` and `DATABASE_URL=<url> cargo clippy -p db --all-targets -- -D warnings` → clean.
|
||||
|
||||
- [ ] **Step 7: Commit.**
|
||||
```bash
|
||||
git add crates/db
|
||||
git commit -m "feat(db): add append-only audit repository (record, history_for)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Immutability & transactional guarantees
|
||||
|
||||
**Files:**
|
||||
- Test: `crates/db/tests/audit_immutability.rs`
|
||||
|
||||
- [ ] **Step 1: Write the tests.** Create `crates/db/tests/audit_immutability.rs`:
|
||||
```rust
|
||||
use db::Db;
|
||||
use db::audit;
|
||||
use domain::{AuditAction, AuditActor, NewAuditEvent};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn sample() -> NewAuditEvent {
|
||||
NewAuditEvent {
|
||||
actor: AuditActor::System,
|
||||
action: AuditAction::Created,
|
||||
entity_type: "object".into(),
|
||||
entity_id: Uuid::new_v4(),
|
||||
changes: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
async fn count(pool: &PgPool) -> i64 {
|
||||
sqlx::query_scalar("SELECT count(*) FROM audit_log")
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn update_and_delete_are_rejected(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
audit::record(db.pool(), &sample()).await.unwrap();
|
||||
|
||||
let updated = sqlx::query("UPDATE audit_log SET action = 'deleted'")
|
||||
.execute(db.pool())
|
||||
.await;
|
||||
assert!(updated.is_err(), "UPDATE must be rejected by the trigger");
|
||||
|
||||
let deleted = sqlx::query("DELETE FROM audit_log").execute(db.pool()).await;
|
||||
assert!(deleted.is_err(), "DELETE must be rejected by the trigger");
|
||||
|
||||
assert_eq!(count(db.pool()).await, 1, "the row is still there");
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn record_rolls_back_with_caller_transaction(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
let mut tx = db.pool().begin().await.unwrap();
|
||||
audit::record(&mut *tx, &sample()).await.unwrap();
|
||||
tx.rollback().await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
count(db.pool()).await,
|
||||
0,
|
||||
"a rolled-back audit record must not persist"
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test]
|
||||
async fn record_commits_with_caller_transaction(pool: PgPool) {
|
||||
let db = Db::from_pool(pool);
|
||||
|
||||
let mut tx = db.pool().begin().await.unwrap();
|
||||
audit::record(&mut *tx, &sample()).await.unwrap();
|
||||
tx.commit().await.unwrap();
|
||||
|
||||
assert_eq!(count(db.pool()).await, 1, "a committed audit record persists");
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run it.** `DATABASE_URL=<url> cargo test -p db --test audit_immutability` → PASS (3 tests). These exercise the DB-level trigger and the `impl PgExecutor` transaction seam (`&mut *tx`).
|
||||
|
||||
- [ ] **Step 3: Full workspace check.** Run:
|
||||
```bash
|
||||
cargo +nightly fmt --check
|
||||
DATABASE_URL=<url> cargo clippy --workspace --all-targets -- -D warnings
|
||||
DATABASE_URL=<url> cargo test --workspace
|
||||
```
|
||||
Expected: all green — domain (5), db (migrate 1 + audit 2 + immutability 3), api (3), server (config 2 + serve 1).
|
||||
|
||||
- [ ] **Step 4: Commit.**
|
||||
```bash
|
||||
git add crates/db
|
||||
git commit -m "test(db): enforce audit_log immutability and transactional atomicity"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review (completed)
|
||||
|
||||
**Spec coverage (§13 Audit & amendment history):**
|
||||
- Append-only, immutable → Task 2 trigger + Task 4 negative tests. ✓
|
||||
- who/when/what with field-level before→after diffs → `AuditActor`, `at`, `AuditAction`, `entity_type`/`entity_id`, `Vec<FieldChange>` (Tasks 1–3). ✓
|
||||
- Stored in the org DB; no `org_id` (single-tenant) → Task 2 schema. ✓
|
||||
- Doubles as amendment history (history per entity) → `history_for` (Task 3). ✓
|
||||
- Covers entity-change events; **auth events deferred to Plan 9** (documented in Design decisions). ✓ (intentional scope boundary, not a gap)
|
||||
- Transaction-capable so future write paths record atomically → `impl PgExecutor` + Task 4 rollback/commit tests. ✓
|
||||
- Wires `domain` into `db` (issue #4) → Task 3 dep. ✓
|
||||
|
||||
**Placeholder scan:** no TODO/TBD; every step has concrete SQL/Rust/commands. The `<url>` token in commands is the documented `DATABASE_URL` value, not a code placeholder.
|
||||
|
||||
**Type consistency:** `NewAuditEvent` / `AuditEntry` / `AuditActor` / `AuditAction` / `FieldChange` field names and signatures are identical across `domain` (Task 1), the `db` repository (Task 3), and all tests (Tasks 3–4). `record(impl PgExecutor, &NewAuditEvent)` and `history_for(impl PgExecutor, &str, Uuid)` signatures match every call site. `Db::migrate()` is defined in Task 2 and used in Task 2's test and `server::run`.
|
||||
|
||||
## Notes for follow-on plans
|
||||
|
||||
- An audit/amendment-history **HTTP endpoint** lands when entities exist and the admin UI needs it (Plan 8/10), reusing `history_for`.
|
||||
- **Auth events** (login success/failure) attach to this spine in Plan 9, likely via an `actor`/`action` extension or a sibling table — decide then.
|
||||
- When the first entity write path lands (Plan 3/4), record its audit entry **inside the entity's transaction** using `record(&mut *tx, …)`.
|
||||
Reference in New Issue
Block a user