Files
biggus-dickus/crates/db/src/vocab.rs
T

395 lines
11 KiB
Rust

//! Controlled vocabularies and terms.
use domain::{
AuditAction, AuditActor, LocalizedLabel, NewAuditEvent, NewTerm, Term, TermId, TermRef,
Vocabulary, VocabularyId,
};
use sqlx::Row;
use crate::audit;
const VOCABULARY_ENTITY_TYPE: &str = "vocabulary";
const TERM_ENTITY_TYPE: &str = "term";
/// Labels aggregated per row as JSON, to read a term and its labels in one query.
const LABELS_JSON: &str = "COALESCE(json_agg(json_build_object('lang', tl.lang, 'label', tl.label) \
ORDER BY tl.lang) FILTER (WHERE tl.term_id IS NOT NULL), '[]'::json)";
/// Create a vocabulary with the given key and record a `created` audit entry, both on
/// `conn` (pass a transaction connection `&mut *tx` so they commit atomically).
pub async fn create_vocabulary(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
key: &str,
) -> Result<Vocabulary, sqlx::Error> {
let id = VocabularyId::new();
sqlx::query("INSERT INTO vocabulary (id, key) VALUES ($1, $2)")
.bind(id.to_uuid())
.bind(key)
.execute(&mut *conn)
.await?;
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Created,
entity_type: VOCABULARY_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(Vocabulary {
id,
key: key.to_owned(),
})
}
/// List all vocabularies, ordered by key.
pub async fn list_vocabularies<'e, E>(executor: E) -> Result<Vec<Vocabulary>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let rows = sqlx::query("SELECT id, key FROM vocabulary ORDER BY key")
.fetch_all(executor)
.await?;
rows.into_iter().map(map_vocabulary).collect()
}
/// Look up a vocabulary by its key.
pub async fn vocabulary_by_key<'e, E>(
executor: E,
key: &str,
) -> Result<Option<Vocabulary>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let row = sqlx::query("SELECT id, key FROM vocabulary WHERE key = $1")
.bind(key)
.fetch_optional(executor)
.await?;
row.map(map_vocabulary).transpose()
}
/// Insert a term and its labels, then record a `created` audit entry. Multiple
/// statements — pass a transaction connection (`&mut *tx`) so everything commits
/// atomically.
pub async fn add_term(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
new: &NewTerm,
) -> Result<TermId, sqlx::Error> {
let id = TermId::new();
sqlx::query("INSERT INTO term (id, vocabulary_id, external_uri) VALUES ($1, $2, $3)")
.bind(id.to_uuid())
.bind(new.vocabulary_id.to_uuid())
.bind(new.external_uri.as_deref())
.execute(&mut *conn)
.await?;
for label in &new.labels {
sqlx::query("INSERT INTO term_label (term_id, lang, label) VALUES ($1, $2, $3)")
.bind(id.to_uuid())
.bind(&label.lang)
.bind(&label.label)
.execute(&mut *conn)
.await?;
}
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Created,
entity_type: TERM_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(id)
}
/// Fetch one term (with its labels).
pub async fn term_by_id<'e, E>(executor: E, id: TermId) -> Result<Option<Term>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let sql = format!(
"SELECT t.id, t.vocabulary_id, t.external_uri, {LABELS_JSON} AS labels \
FROM term t LEFT JOIN term_label tl ON tl.term_id = t.id \
WHERE t.id = $1 GROUP BY t.id"
);
let row = sqlx::query(&sql)
.bind(id.to_uuid())
.fetch_optional(executor)
.await?;
row.map(map_term).transpose()
}
/// List all terms in a vocabulary (with labels), ordered by id.
pub async fn list_terms<'e, E>(
executor: E,
vocabulary_id: VocabularyId,
) -> Result<Vec<Term>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let sql = format!(
"SELECT t.id, t.vocabulary_id, t.external_uri, {LABELS_JSON} AS labels \
FROM term t LEFT JOIN term_label tl ON tl.term_id = t.id \
WHERE t.vocabulary_id = $1 GROUP BY t.id ORDER BY t.id"
);
let rows = sqlx::query(&sql)
.bind(vocabulary_id.to_uuid())
.fetch_all(executor)
.await?;
rows.into_iter().map(map_term).collect()
}
/// Resolve a term to a [`TermRef`], confirming it belongs to `vocabulary_id`.
pub async fn resolve_term<'e, E>(
executor: E,
vocabulary_id: VocabularyId,
term_id: TermId,
) -> Result<Option<TermRef>, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
let found =
sqlx::query_scalar::<_, i32>("SELECT 1 FROM term WHERE id = $1 AND vocabulary_id = $2")
.bind(term_id.to_uuid())
.bind(vocabulary_id.to_uuid())
.fetch_optional(executor)
.await?;
Ok(found.map(|_| TermRef::new(term_id, vocabulary_id)))
}
/// Update a term's `external_uri` and labels (full replace), recording an `updated`
/// audit entry. Returns `false` if no such term or the term does not belong to
/// `vocabulary_id`. Pass a transaction connection.
pub async fn update_term(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
vocabulary_id: VocabularyId,
term_id: TermId,
external_uri: Option<&str>,
labels: &[LocalizedLabel],
) -> Result<bool, sqlx::Error> {
let updated =
sqlx::query("UPDATE term SET external_uri = $2 WHERE id = $1 AND vocabulary_id = $3")
.bind(term_id.to_uuid())
.bind(external_uri)
.bind(vocabulary_id.to_uuid())
.execute(&mut *conn)
.await?
.rows_affected();
if updated == 0 {
return Ok(false);
}
sqlx::query("DELETE FROM term_label WHERE term_id = $1")
.bind(term_id.to_uuid())
.execute(&mut *conn)
.await?;
for label in labels {
sqlx::query("INSERT INTO term_label (term_id, lang, label) VALUES ($1, $2, $3)")
.bind(term_id.to_uuid())
.bind(&label.lang)
.bind(&label.label)
.execute(&mut *conn)
.await?;
}
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Updated,
entity_type: TERM_ENTITY_TYPE.to_owned(),
entity_id: term_id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(true)
}
/// Count catalogue objects that reference `term_id` through a `term`-typed field.
pub async fn count_objects_referencing_term<'e, E>(
executor: E,
term_id: TermId,
) -> Result<i64, sqlx::Error>
where
E: sqlx::PgExecutor<'e>,
{
sqlx::query_scalar(
"SELECT count(*) FROM object o WHERE EXISTS ( \
SELECT 1 FROM field_definition fd \
WHERE fd.data_type = 'term' AND o.fields ->> fd.key = $1 )",
)
.bind(term_id.to_string())
.fetch_one(executor)
.await
}
/// Delete a term (its labels cascade) unless catalogue objects reference it, recording a
/// `deleted` audit entry. Pass a transaction connection.
pub async fn delete_term(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
vocabulary_id: VocabularyId,
term_id: TermId,
) -> Result<crate::DeleteOutcome, sqlx::Error> {
let exists =
sqlx::query_scalar::<_, i32>("SELECT 1 FROM term WHERE id = $1 AND vocabulary_id = $2")
.bind(term_id.to_uuid())
.bind(vocabulary_id.to_uuid())
.fetch_optional(&mut *conn)
.await?;
if exists.is_none() {
return Ok(crate::DeleteOutcome::NotFound);
}
let count = count_objects_referencing_term(&mut *conn, term_id).await?;
if count > 0 {
return Ok(crate::DeleteOutcome::InUse { count });
}
sqlx::query("DELETE FROM term WHERE id = $1")
.bind(term_id.to_uuid())
.execute(&mut *conn)
.await?;
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Deleted,
entity_type: TERM_ENTITY_TYPE.to_owned(),
entity_id: term_id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(crate::DeleteOutcome::Deleted)
}
/// Rename a vocabulary's key, recording an `updated` audit entry. Returns `false` if no
/// such vocabulary. A unique-key collision surfaces as the underlying sqlx error (23505).
pub async fn rename_vocabulary(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
id: VocabularyId,
key: &str,
) -> Result<bool, sqlx::Error> {
let updated = sqlx::query("UPDATE vocabulary SET key = $2 WHERE id = $1")
.bind(id.to_uuid())
.bind(key)
.execute(&mut *conn)
.await?
.rows_affected();
if updated == 0 {
return Ok(false);
}
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Updated,
entity_type: VOCABULARY_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(true)
}
/// Delete a vocabulary unless it still has terms or is bound by a field definition
/// (both would otherwise hit the FK `RESTRICT`). Records a `deleted` audit entry.
pub async fn delete_vocabulary(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
id: VocabularyId,
) -> Result<crate::DeleteOutcome, sqlx::Error> {
let exists = sqlx::query_scalar::<_, i32>("SELECT 1 FROM vocabulary WHERE id = $1")
.bind(id.to_uuid())
.fetch_optional(&mut *conn)
.await?;
if exists.is_none() {
return Ok(crate::DeleteOutcome::NotFound);
}
let count: i64 = sqlx::query_scalar(
"SELECT (SELECT count(*) FROM term WHERE vocabulary_id = $1) \
+ (SELECT count(*) FROM field_definition WHERE vocabulary_id = $1)",
)
.bind(id.to_uuid())
.fetch_one(&mut *conn)
.await?;
if count > 0 {
return Ok(crate::DeleteOutcome::InUse { count });
}
sqlx::query("DELETE FROM vocabulary WHERE id = $1")
.bind(id.to_uuid())
.execute(&mut *conn)
.await?;
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Deleted,
entity_type: VOCABULARY_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(crate::DeleteOutcome::Deleted)
}
fn map_vocabulary(row: sqlx::postgres::PgRow) -> Result<Vocabulary, sqlx::Error> {
Ok(Vocabulary {
id: VocabularyId::from_uuid(row.try_get("id")?),
key: row.try_get("key")?,
})
}
fn map_term(row: sqlx::postgres::PgRow) -> Result<Term, sqlx::Error> {
let labels: sqlx::types::Json<Vec<LocalizedLabel>> = row.try_get("labels")?;
Ok(Term {
id: TermId::from_uuid(row.try_get("id")?),
vocabulary_id: VocabularyId::from_uuid(row.try_get("vocabulary_id")?),
external_uri: row.try_get("external_uri")?,
labels: labels.0,
})
}