Files
biggus-dickus/crates/db/src/catalog.rs
T
2026-06-06 23:21:04 +02:00

710 lines
22 KiB
Rust

//! Catalogue objects (the inventory-minimum core). Writes record audit entries
//! on the caller's connection, so the change and its audit entry commit together.
use domain::{
AuditAction, AuditActor, CatalogueObject, FieldChange, FieldType, IllegalTransition,
NewAuditEvent, ObjectId, ObjectInput, Visibility,
};
use serde_json::{Value, json};
use sqlx::Row;
use crate::{audit, authority, fields, vocab};
/// The entity_type recorded in the audit log for catalogue objects.
const ENTITY_TYPE: &str = "object";
/// The visibility value eligible for the public surface.
const PUBLIC_VISIBILITY: &str = Visibility::Public.as_str();
const OBJECT_COLUMNS: &str = "id, object_number, object_name, number_of_objects, \
brief_description, current_location, current_owner, recorder, recording_date, \
visibility, fields, created_at, updated_at";
/// Create an object and record a `created` audit entry, both on `conn`
/// (pass a transaction connection `&mut *tx` so they commit atomically).
pub async fn create_object(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
input: &ObjectInput,
) -> Result<ObjectId, sqlx::Error> {
let id = ObjectId::new();
sqlx::query(
"INSERT INTO object \
(id, object_number, object_name, number_of_objects, brief_description, \
current_location, current_owner, recorder, recording_date, visibility) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
)
.bind(id.to_uuid())
.bind(&input.object_number)
.bind(&input.object_name)
.bind(input.number_of_objects)
.bind(input.brief_description.as_deref())
.bind(input.current_location.as_deref())
.bind(input.current_owner.as_deref())
.bind(input.recorder.as_deref())
.bind(input.recording_date)
.bind(input.visibility.as_str())
.execute(&mut *conn)
.await?;
let changes = creation_changes(input);
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Created,
entity_type: ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes,
},
)
.await?;
Ok(id)
}
/// Fetch one object by id.
pub async fn object_by_id<'e, E>(
executor: E,
id: ObjectId,
) -> Result<Option<CatalogueObject>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let sql = format!("SELECT {OBJECT_COLUMNS} FROM object WHERE id = $1");
let row = sqlx::query(&sql)
.bind(id.to_uuid())
.fetch_optional(executor)
.await?;
row.map(map_object).transpose()
}
/// List all objects, ordered by object number.
pub async fn list_objects<'e, E>(executor: E) -> Result<Vec<CatalogueObject>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
// TODO: add LIMIT/keyset pagination before exposing this via the API.
let sql = format!("SELECT {OBJECT_COLUMNS} FROM object ORDER BY object_number");
let rows = sqlx::query(&sql).fetch_all(executor).await?;
rows.into_iter().map(map_object).collect()
}
/// Whitelisted, injection-safe sort columns for the object list. The client never
/// supplies a column name directly — the API layer maps an opaque token onto a variant,
/// and only [`ObjectSort::column`] (returning a `'static str`) reaches the SQL string.
#[derive(Debug, Clone, Copy)]
pub enum ObjectSort {
ObjectNumber,
ObjectName,
UpdatedAt,
CreatedAt,
Visibility,
}
impl ObjectSort {
fn column(self) -> &'static str {
match self {
ObjectSort::ObjectNumber => "object_number",
ObjectSort::ObjectName => "object_name",
ObjectSort::UpdatedAt => "updated_at",
ObjectSort::CreatedAt => "created_at",
ObjectSort::Visibility => "visibility",
}
}
}
/// Filters + ordering for a paged object query. `visibility`/`q` are optional;
/// both are bound as parameters, never interpolated into the SQL string.
pub struct ObjectQuery<'a> {
pub sort: ObjectSort,
pub descending: bool,
pub visibility: Option<&'a str>,
pub q: Option<&'a str>,
}
/// Build the optional `WHERE` clause and its ordered bind values from the filters.
/// Each clause references a positional placeholder (`$1`, `$2`, …) matching the order
/// the returned `binds` are applied; the client's strings only ever arrive as binds.
fn where_clause(visibility: Option<&str>, q: Option<&str>) -> (String, Vec<String>) {
let mut clauses = Vec::new();
let mut binds = Vec::new();
if let Some(v) = visibility {
binds.push(v.to_owned());
clauses.push(format!("visibility = ${}", binds.len()));
}
if let Some(term) = q {
binds.push(format!("%{term}%"));
let p = binds.len();
clauses.push(format!(
"(object_number ILIKE ${p} OR object_name ILIKE ${p})"
));
}
let sql = if clauses.is_empty() {
String::new()
} else {
format!(" WHERE {}", clauses.join(" AND "))
};
(sql, binds)
}
/// List objects (all visibility levels) with whitelisted sort, optional visibility/quick
/// filters, and paging. Ordering uses [`ObjectSort::column`] (a `'static str`) plus a
/// stable secondary key, so no client-controlled string ever reaches the SQL text.
pub async fn list_objects_query(
pool: &sqlx::PgPool,
query: &ObjectQuery<'_>,
limit: i64,
offset: i64,
) -> Result<Vec<CatalogueObject>, sqlx::Error> {
let (where_sql, binds) = where_clause(query.visibility, query.q);
let dir = if query.descending { "DESC" } else { "ASC" };
// Secondary key keeps ordering stable when the primary sort has ties.
let sql = format!(
"SELECT {OBJECT_COLUMNS} FROM object{where_sql} \
ORDER BY {} {dir}, object_number ASC LIMIT ${} OFFSET ${}",
query.sort.column(),
binds.len() + 1,
binds.len() + 2,
);
let mut sql_query = sqlx::query(&sql);
for bind in &binds {
sql_query = sql_query.bind(bind);
}
let rows = sql_query.bind(limit).bind(offset).fetch_all(pool).await?;
rows.into_iter().map(map_object).collect()
}
/// Count objects matching the optional visibility/quick filters (for pagination totals).
pub async fn count_objects_query(
pool: &sqlx::PgPool,
visibility: Option<&str>,
q: Option<&str>,
) -> Result<i64, sqlx::Error> {
let (where_sql, binds) = where_clause(visibility, q);
let sql = format!("SELECT count(*) AS n FROM object{where_sql}");
let mut sql_query = sqlx::query(&sql);
for bind in &binds {
sql_query = sql_query.bind(bind);
}
sql_query.fetch_one(pool).await?.try_get("n")
}
/// Fetch one **public** object by id. Returns `None` if the object is missing **or**
/// not public — callers map both to 404 so non-public existence isn't revealed.
pub async fn public_object_by_id<'e, E>(
executor: E,
id: ObjectId,
) -> Result<Option<CatalogueObject>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let sql = format!("SELECT {OBJECT_COLUMNS} FROM object WHERE id = $1 AND visibility = $2");
let row = sqlx::query(&sql)
.bind(id.to_uuid())
.bind(PUBLIC_VISIBILITY)
.fetch_optional(executor)
.await?;
row.map(map_object).transpose()
}
/// List **public** objects ordered by object number, with `limit`/`offset` paging.
///
/// `limit` and `offset` must be non-negative (Postgres rejects a negative `LIMIT`);
/// the public API layer clamps them before calling.
pub async fn list_public_objects<'e, E>(
executor: E,
limit: i64,
offset: i64,
) -> Result<Vec<CatalogueObject>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let sql = format!(
"SELECT {OBJECT_COLUMNS} FROM object WHERE visibility = $1 \
ORDER BY object_number LIMIT $2 OFFSET $3"
);
let rows = sqlx::query(&sql)
.bind(PUBLIC_VISIBILITY)
.bind(limit)
.bind(offset)
.fetch_all(executor)
.await?;
rows.into_iter().map(map_object).collect()
}
/// Count all public objects (for pagination totals).
pub async fn count_public_objects<'e, E>(executor: E) -> Result<i64, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let row = sqlx::query("SELECT count(*) AS n FROM object WHERE visibility = $1")
.bind(PUBLIC_VISIBILITY)
.fetch_one(executor)
.await?;
row.try_get("n")
}
fn map_object(row: sqlx::postgres::PgRow) -> Result<CatalogueObject, sqlx::Error> {
let visibility_str: String = row.try_get("visibility")?;
let visibility = Visibility::from_db(&visibility_str).ok_or_else(|| {
sqlx::Error::Decode(format!("unknown visibility: {visibility_str}").into())
})?;
Ok(CatalogueObject {
id: ObjectId::from_uuid(row.try_get("id")?),
object_number: row.try_get("object_number")?,
object_name: row.try_get("object_name")?,
number_of_objects: row.try_get("number_of_objects")?,
brief_description: row.try_get("brief_description")?,
current_location: row.try_get("current_location")?,
current_owner: row.try_get("current_owner")?,
recorder: row.try_get("recorder")?,
recording_date: row.try_get("recording_date")?,
visibility,
fields: row.try_get("fields")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
/// The mutable fields as `(name, value)` pairs, for building audit diffs.
/// `None` means the field is unset (NULL).
fn field_values(input: &ObjectInput) -> Vec<(&'static str, Option<Value>)> {
vec![
("object_number", Some(json!(input.object_number))),
("object_name", Some(json!(input.object_name))),
("number_of_objects", Some(json!(input.number_of_objects))),
(
"brief_description",
input.brief_description.as_ref().map(|v| json!(v)),
),
(
"current_location",
input.current_location.as_ref().map(|v| json!(v)),
),
(
"current_owner",
input.current_owner.as_ref().map(|v| json!(v)),
),
("recorder", input.recorder.as_ref().map(|v| json!(v))),
("recording_date", input.recording_date.map(|d| json!(d))),
("visibility", Some(json!(input.visibility.as_str()))),
]
}
/// Audit changes for a newly created object: every set field as an `after` value.
/// Unset (`None`) optional fields are omitted — absence is conveyed by their not
/// appearing, consistent with `FieldChange`'s `None`-means-no-value convention.
fn creation_changes(input: &ObjectInput) -> Vec<FieldChange> {
field_values(input)
.into_iter()
.filter_map(|(field, after)| {
after.map(|a| FieldChange {
field: field.to_owned(),
before: None,
after: Some(a),
})
})
.collect()
}
/// Audit changes between two field sets: only the fields whose value changed.
fn update_changes(old: &ObjectInput, new: &ObjectInput) -> Vec<FieldChange> {
field_values(old)
.into_iter()
.zip(field_values(new))
.filter_map(|((field, before), (_, after))| {
if before != after {
Some(FieldChange {
field: field.to_owned(),
before,
after,
})
} else {
None
}
})
.collect()
}
/// Update an object and record an `updated` audit entry with field-level diffs,
/// both on `conn`. Returns `false` if the object does not exist. A no-op update
/// (no fields changed) records no audit entry.
pub async fn update_object(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
id: ObjectId,
input: &ObjectInput,
) -> Result<bool, sqlx::Error> {
let Some(old) = object_by_id(&mut *conn, id).await? else {
return Ok(false);
};
apply_object_update(&mut *conn, actor, id, &old.to_input(), input).await?;
Ok(true)
}
/// Diff `old`→`new`, write the changed columns + an `updated` audit entry, both on
/// `conn`. A no-op (no field changed) touches neither the row's `updated_at` nor the
/// audit log.
async fn apply_object_update(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
id: ObjectId,
old: &ObjectInput,
new: &ObjectInput,
) -> Result<(), sqlx::Error> {
let changes = update_changes(old, new);
if changes.is_empty() {
return Ok(());
}
sqlx::query(
"UPDATE object SET \
object_number = $2, object_name = $3, number_of_objects = $4, \
brief_description = $5, current_location = $6, current_owner = $7, \
recorder = $8, recording_date = $9, visibility = $10, updated_at = now() \
WHERE id = $1",
)
.bind(id.to_uuid())
.bind(&new.object_number)
.bind(&new.object_name)
.bind(new.number_of_objects)
.bind(new.brief_description.as_deref())
.bind(new.current_location.as_deref())
.bind(new.current_owner.as_deref())
.bind(new.recorder.as_deref())
.bind(new.recording_date)
.bind(new.visibility.as_str())
.execute(&mut *conn)
.await?;
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Updated,
entity_type: ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes,
},
)
.await?;
Ok(())
}
/// Why changing an object's visibility failed.
#[derive(Debug, thiserror::Error)]
pub enum VisibilityError {
#[error("object not found")]
ObjectNotFound,
#[error(transparent)]
Illegal(#[from] IllegalTransition),
#[error("missing required field(s): {}", .0.join(", "))]
MissingRequiredFields(Vec<String>),
#[error(transparent)]
Db(#[from] sqlx::Error),
}
/// Move an object to `target` visibility, enforcing the stepwise state machine, and
/// audit the change. Uses the same diff/audit path as `update_object`, so only
/// `visibility` appears in the audit entry — and setting to the current value is an
/// idempotent no-op (no row touch, no audit). Pass a transaction connection.
pub async fn set_visibility(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
id: ObjectId,
target: Visibility,
) -> Result<(), VisibilityError> {
let Some(object) = object_by_id(&mut *conn, id).await? else {
return Err(VisibilityError::ObjectNotFound);
};
let new_visibility = object.visibility.transition_to(target)?;
// The publish gate: a record may only *become* public once every required field
// has a value. The typed inventory-minimum columns are already NOT NULL, so only
// the flexible required fields need checking here. Gated on an actual transition
// into public so a set-to-current no-op stays a no-op (never a late rejection).
if new_visibility == Visibility::Public && object.visibility != Visibility::Public {
let missing = missing_required_fields(&mut *conn, &object.fields).await?;
if !missing.is_empty() {
return Err(VisibilityError::MissingRequiredFields(missing));
}
}
let old_input = object.to_input();
let mut new_input = old_input.clone();
new_input.visibility = new_visibility;
apply_object_update(&mut *conn, actor, id, &old_input, &new_input).await?;
Ok(())
}
/// The keys of `required` field definitions that have no value on `fields` (absent or
/// null). Empty when every required field is present.
async fn missing_required_fields(
conn: &mut sqlx::PgConnection,
fields: &Value,
) -> Result<Vec<String>, sqlx::Error> {
let definitions = fields::list_field_definitions(&mut *conn).await?;
Ok(definitions
.into_iter()
.filter(|definition| definition.required)
.filter(|definition| fields.get(&definition.key).is_none_or(Value::is_null))
.map(|definition| definition.key)
.collect())
}
/// Delete an object and record a `deleted` audit entry, both on `conn`.
/// Returns `false` if the object did not exist.
pub async fn delete_object(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
id: ObjectId,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query("DELETE FROM object WHERE id = $1")
.bind(id.to_uuid())
.execute(&mut *conn)
.await?;
if result.rows_affected() == 0 {
return Ok(false);
}
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Deleted,
entity_type: ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(true)
}
/// Why setting flexible field values failed.
#[derive(Debug, thiserror::Error)]
pub enum FieldError {
#[error("object not found")]
ObjectNotFound,
#[error("unknown field: {0}")]
UnknownField(String),
#[error("field `{field}` expects a {expected} value")]
TypeMismatch {
field: String,
expected: &'static str,
},
#[error("field `{field}`: value does not resolve to an existing {kind}")]
Unresolved { field: String, kind: &'static str },
#[error(transparent)]
Db(#[from] sqlx::Error),
}
/// Replace an object's flexible field values, validating each against the registry
/// (type + term/authority resolution), and audit the per-field diff — all on `conn`.
/// A no-op (identical to the current values) writes nothing and records no audit.
///
/// **Replace semantics:** `values` is the *complete* desired set. Omitting a key that
/// was previously set REMOVES it (recorded in the audit as a removal); send every key
/// the caller wants to retain.
///
/// Required-field *completeness* is intentionally NOT enforced here — a caller may set
/// any subset. That check belongs to the publish gate (when moving to
/// `Visibility::Public`, Plan 7).
pub async fn set_object_fields(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
object_id: ObjectId,
values: &serde_json::Map<String, Value>,
) -> Result<(), FieldError> {
let Some(old) = object_by_id(&mut *conn, object_id).await? else {
return Err(FieldError::ObjectNotFound);
};
for (key, value) in values {
validate_field(&mut *conn, key, value).await?;
}
let new_fields = Value::Object(values.clone());
let changes = field_map_changes(&old.fields, &new_fields);
if changes.is_empty() {
return Ok(());
}
sqlx::query("UPDATE object SET fields = $2, updated_at = now() WHERE id = $1")
.bind(object_id.to_uuid())
.bind(&new_fields)
.execute(&mut *conn)
.await?;
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Updated,
entity_type: ENTITY_TYPE.to_owned(),
entity_id: object_id.to_uuid(),
changes,
},
)
.await?;
Ok(())
}
async fn validate_field(
conn: &mut sqlx::PgConnection,
key: &str,
value: &Value,
) -> Result<(), FieldError> {
let def = fields::field_definition_by_key(&mut *conn, key)
.await?
.ok_or_else(|| FieldError::UnknownField(key.to_owned()))?;
match def.field_type {
FieldType::Text => require(value.is_string(), key, "text")?,
FieldType::LocalizedText => require(
value
.as_object()
.is_some_and(|o| o.values().all(Value::is_string)),
key,
"localized-text object {lang: string}",
)?,
FieldType::Integer => require(value.is_i64(), key, "integer")?,
// Format/range validation (real date parsing) is deferred to issue #11;
// here a date field only requires a string value.
FieldType::Date => require(value.is_string(), key, "date string")?,
FieldType::Boolean => require(value.is_boolean(), key, "boolean")?,
FieldType::Term { vocabulary_id } => {
let term_id = parse_uuid(value, key, "term id (uuid string)")?;
if vocab::resolve_term(
&mut *conn,
vocabulary_id,
domain::TermId::from_uuid(term_id),
)
.await?
.is_none()
{
return Err(FieldError::Unresolved {
field: key.to_owned(),
kind: "term",
});
}
}
FieldType::Authority { kind } => {
let authority_id = parse_uuid(value, key, "authority id (uuid string)")?;
match authority::resolve_authority(
&mut *conn,
domain::AuthorityId::from_uuid(authority_id),
)
.await?
{
Some(ref_) if kind.is_none_or(|k| ref_.kind() == k) => {}
_ => {
return Err(FieldError::Unresolved {
field: key.to_owned(),
kind: "authority",
});
}
}
}
}
Ok(())
}
fn require(ok: bool, field: &str, expected: &'static str) -> Result<(), FieldError> {
if ok {
Ok(())
} else {
Err(FieldError::TypeMismatch {
field: field.to_owned(),
expected,
})
}
}
fn parse_uuid(
value: &Value,
field: &str,
expected: &'static str,
) -> Result<uuid::Uuid, FieldError> {
value
.as_str()
.and_then(|s| s.parse::<uuid::Uuid>().ok())
.ok_or_else(|| FieldError::TypeMismatch {
field: field.to_owned(),
expected,
})
}
/// Per-key diff between two flexible-field maps. `before`/`after` are `None` when
/// the key is absent on that side (so adds and removes are captured).
fn field_map_changes(old: &Value, new: &Value) -> Vec<FieldChange> {
let empty = serde_json::Map::new();
let old_map = old.as_object().unwrap_or(&empty);
let new_map = new.as_object().unwrap_or(&empty);
let keys: std::collections::BTreeSet<&String> = old_map.keys().chain(new_map.keys()).collect();
keys.into_iter()
.filter_map(|key| {
let before = old_map.get(key).cloned();
let after = new_map.get(key).cloned();
if before != after {
Some(FieldChange {
field: key.clone(),
before,
after,
})
} else {
None
}
})
.collect()
}