feat(db): add append-only audit repository (record, history_for)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,10 @@ rust-version.workspace = true
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
sqlx.workspace = true
|
sqlx.workspace = true
|
||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
|
domain = { path = "../domain" }
|
||||||
|
uuid.workspace = true
|
||||||
|
time.workspace = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
|||||||
@@ -0,0 +1,92 @@
|
|||||||
|
//! Append-only audit log access.
|
||||||
|
|
||||||
|
use domain::{AuditActor, AuditEntry, FieldChange, NewAuditEvent};
|
||||||
|
use sqlx::Row;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// Append an audit event. Accepts any executor, so callers can record the event
|
||||||
|
/// inside the same transaction as the change it describes.
|
||||||
|
pub async fn record<'e, E>(executor: E, event: &NewAuditEvent) -> Result<(), sqlx::Error>
|
||||||
|
where
|
||||||
|
E: sqlx::PgExecutor<'e>,
|
||||||
|
{
|
||||||
|
let (actor_kind, actor_id) = match event.actor {
|
||||||
|
AuditActor::User(id) => ("user", Some(id)),
|
||||||
|
AuditActor::System => ("system", None),
|
||||||
|
};
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO audit_log \
|
||||||
|
(actor_kind, actor_id, action, entity_type, entity_id, changes) \
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6)",
|
||||||
|
)
|
||||||
|
.bind(actor_kind)
|
||||||
|
.bind(actor_id)
|
||||||
|
.bind(event.action.as_str())
|
||||||
|
.bind(&event.entity_type)
|
||||||
|
.bind(event.entity_id)
|
||||||
|
.bind(sqlx::types::Json(&event.changes))
|
||||||
|
.execute(executor)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the full history for one entity, oldest first.
|
||||||
|
pub async fn history_for<'e, E>(
|
||||||
|
executor: E,
|
||||||
|
entity_type: &str,
|
||||||
|
entity_id: Uuid,
|
||||||
|
) -> Result<Vec<AuditEntry>, sqlx::Error>
|
||||||
|
where
|
||||||
|
E: sqlx::PgExecutor<'e>,
|
||||||
|
{
|
||||||
|
let rows = sqlx::query(
|
||||||
|
"SELECT seq, at, actor_kind, actor_id, action, entity_type, entity_id, changes \
|
||||||
|
FROM audit_log \
|
||||||
|
WHERE entity_type = $1 AND entity_id = $2 \
|
||||||
|
ORDER BY seq",
|
||||||
|
)
|
||||||
|
.bind(entity_type)
|
||||||
|
.bind(entity_id)
|
||||||
|
.fetch_all(executor)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
rows.into_iter().map(map_row).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_row(row: sqlx::postgres::PgRow) -> Result<AuditEntry, sqlx::Error> {
|
||||||
|
let seq: i64 = row.try_get("seq")?;
|
||||||
|
let at: time::OffsetDateTime = row.try_get("at")?;
|
||||||
|
let actor_kind: String = row.try_get("actor_kind")?;
|
||||||
|
let actor_id: Option<Uuid> = row.try_get("actor_id")?;
|
||||||
|
let action: String = row.try_get("action")?;
|
||||||
|
let entity_type: String = row.try_get("entity_type")?;
|
||||||
|
let entity_id: Uuid = row.try_get("entity_id")?;
|
||||||
|
let changes: sqlx::types::Json<Vec<FieldChange>> = row.try_get("changes")?;
|
||||||
|
|
||||||
|
let actor = match actor_kind.as_str() {
|
||||||
|
"user" => AuditActor::User(
|
||||||
|
actor_id.ok_or_else(|| sqlx::Error::Decode("user actor missing actor_id".into()))?,
|
||||||
|
),
|
||||||
|
"system" => AuditActor::System,
|
||||||
|
other => {
|
||||||
|
return Err(sqlx::Error::Decode(
|
||||||
|
format!("unknown actor_kind: {other}").into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let action = domain::AuditAction::from_db(&action)
|
||||||
|
.ok_or_else(|| sqlx::Error::Decode(format!("unknown action: {action}").into()))?;
|
||||||
|
|
||||||
|
Ok(AuditEntry {
|
||||||
|
seq,
|
||||||
|
at,
|
||||||
|
actor,
|
||||||
|
action,
|
||||||
|
entity_type,
|
||||||
|
entity_id,
|
||||||
|
changes: changes.0,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
//! Database access. All SQL lives in this crate.
|
//! Database access. All SQL lives in this crate.
|
||||||
|
|
||||||
|
pub mod audit;
|
||||||
|
|
||||||
use sqlx::postgres::{PgPool, PgPoolOptions};
|
use sqlx::postgres::{PgPool, PgPoolOptions};
|
||||||
|
|
||||||
/// A handle to the organization's PostgreSQL database.
|
/// A handle to the organization's PostgreSQL database.
|
||||||
|
|||||||
@@ -0,0 +1,70 @@
|
|||||||
|
use db::Db;
|
||||||
|
use db::audit;
|
||||||
|
use domain::{AuditAction, AuditActor, FieldChange, NewAuditEvent};
|
||||||
|
use serde_json::json;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
fn created(entity_id: Uuid, name: &str) -> NewAuditEvent {
|
||||||
|
NewAuditEvent {
|
||||||
|
actor: AuditActor::System,
|
||||||
|
action: AuditAction::Created,
|
||||||
|
entity_type: "object".into(),
|
||||||
|
entity_id,
|
||||||
|
changes: vec![FieldChange {
|
||||||
|
field: "name".into(),
|
||||||
|
before: None,
|
||||||
|
after: Some(json!(name)),
|
||||||
|
}],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test]
|
||||||
|
async fn records_and_reads_back_history_in_order(pool: PgPool) {
|
||||||
|
let db = Db::from_pool(pool);
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let user = Uuid::new_v4();
|
||||||
|
|
||||||
|
audit::record(db.pool(), &created(id, "Vase"))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
audit::record(
|
||||||
|
db.pool(),
|
||||||
|
&NewAuditEvent {
|
||||||
|
actor: AuditActor::User(user),
|
||||||
|
action: AuditAction::Updated,
|
||||||
|
entity_type: "object".into(),
|
||||||
|
entity_id: id,
|
||||||
|
changes: vec![FieldChange {
|
||||||
|
field: "name".into(),
|
||||||
|
before: Some(json!("Vase")),
|
||||||
|
after: Some(json!("Roman Vase")),
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let history = audit::history_for(db.pool(), "object", id).await.unwrap();
|
||||||
|
assert_eq!(history.len(), 2);
|
||||||
|
assert_eq!(history[0].action, AuditAction::Created);
|
||||||
|
assert_eq!(history[0].actor, AuditActor::System);
|
||||||
|
assert_eq!(history[1].action, AuditAction::Updated);
|
||||||
|
assert_eq!(history[1].actor, AuditActor::User(user));
|
||||||
|
assert!(history[0].seq < history[1].seq, "ordered by seq");
|
||||||
|
assert_eq!(history[1].changes[0].field, "name");
|
||||||
|
assert_eq!(history[1].changes[0].after, Some(json!("Roman Vase")));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test]
|
||||||
|
async fn history_is_scoped_to_one_entity(pool: PgPool) {
|
||||||
|
let db = Db::from_pool(pool);
|
||||||
|
let a = Uuid::new_v4();
|
||||||
|
let b = Uuid::new_v4();
|
||||||
|
audit::record(db.pool(), &created(a, "A")).await.unwrap();
|
||||||
|
audit::record(db.pool(), &created(b, "B")).await.unwrap();
|
||||||
|
|
||||||
|
let only_a = audit::history_for(db.pool(), "object", a).await.unwrap();
|
||||||
|
assert_eq!(only_a.len(), 1);
|
||||||
|
assert_eq!(only_a[0].entity_id, a);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user