merge: tier 4 hardening batch 1 (#1 #2 #21)

#1 graceful shutdown on SIGINT/SIGTERM (axum with_graceful_shutdown).
#2 configurable DB pool size (--db-max-connections / DB_MAX_CONNECTIONS, default 5).
#21 audit vocabulary/term/authority creation atomically, attributing the acting
user; ~15 call sites threaded an AuditActor.

173 workspace tests; clippy + fmt clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-04 22:10:13 +02:00
15 changed files with 257 additions and 63 deletions
+6 -5
View File
@@ -7,7 +7,7 @@ use axum::{
http::StatusCode, http::StatusCode,
routing::get, routing::get,
}; };
use domain::{AuthorityKind, LocalizedLabel, NewAuthority}; use domain::{AuditActor, AuthorityKind, LocalizedLabel, NewAuthority};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use utoipa::ToSchema; use utoipa::ToSchema;
@@ -91,7 +91,7 @@ pub(crate) async fn list_authorities(
) )
)] )]
pub(crate) async fn create_authority( pub(crate) async fn create_authority(
_auth: Authorized<EditCatalogue>, auth: Authorized<EditCatalogue>,
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<NewAuthorityRequest>, Json(req): Json<NewAuthorityRequest>,
) -> Result<(StatusCode, Json<CreatedId>), StatusCode> { ) -> Result<(StatusCode, Json<CreatedId>), StatusCode> {
@@ -117,9 +117,10 @@ pub(crate) async fn create_authority(
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let id = db::authority::create_authority(&mut tx, &new) let id =
.await db::authority::create_authority(&mut tx, AuditActor::User(auth.user.id.to_uuid()), &new)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
tx.commit() tx.commit()
.await .await
+27 -13
View File
@@ -7,7 +7,7 @@ use axum::{
http::StatusCode, http::StatusCode,
routing::get, routing::get,
}; };
use domain::{LocalizedLabel, NewTerm, VocabularyId}; use domain::{AuditActor, LocalizedLabel, NewTerm, VocabularyId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use utoipa::ToSchema; use utoipa::ToSchema;
@@ -85,11 +85,23 @@ pub(crate) async fn list_vocabularies(
) )
)] )]
pub(crate) async fn create_vocabulary( pub(crate) async fn create_vocabulary(
_auth: Authorized<EditCatalogue>, auth: Authorized<EditCatalogue>,
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<NewVocabularyRequest>, Json(req): Json<NewVocabularyRequest>,
) -> Result<(StatusCode, Json<VocabularyView>), StatusCode> { ) -> Result<(StatusCode, Json<VocabularyView>), StatusCode> {
let vocab = db::vocab::create_vocabulary(state.db.pool(), &req.key) let mut tx = state
.db
.pool()
.begin()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let vocab =
db::vocab::create_vocabulary(&mut tx, AuditActor::User(auth.user.id.to_uuid()), &req.key)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
tx.commit()
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -156,7 +168,7 @@ pub(crate) async fn list_terms(
) )
)] )]
pub(crate) async fn add_term( pub(crate) async fn add_term(
_auth: Authorized<EditCatalogue>, auth: Authorized<EditCatalogue>,
State(state): State<AppState>, State(state): State<AppState>,
Path(id): Path<String>, Path(id): Path<String>,
Json(req): Json<NewTermRequest>, Json(req): Json<NewTermRequest>,
@@ -185,15 +197,17 @@ pub(crate) async fn add_term(
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let term_id = db::vocab::add_term(&mut tx, &new).await.map_err(|err| { let term_id = db::vocab::add_term(&mut tx, AuditActor::User(auth.user.id.to_uuid()), &new)
// A well-formed id for a missing vocabulary hits the FK constraint (23503). .await
if err.as_database_error().and_then(|e| e.code()).as_deref() == Some("23503") { .map_err(|err| {
StatusCode::NOT_FOUND // A well-formed id for a missing vocabulary hits the FK constraint (23503).
} else { if err.as_database_error().and_then(|e| e.code()).as_deref() == Some("23503") {
tracing::error!(?err, "adding term"); StatusCode::NOT_FOUND
StatusCode::INTERNAL_SERVER_ERROR } else {
} tracing::error!(?err, "adding term");
})?; StatusCode::INTERNAL_SERVER_ERROR
}
})?;
tx.commit() tx.commit()
.await .await
+43 -2
View File
@@ -1,8 +1,8 @@
use api::{AppState, build_app, migrate_sessions}; use api::{AppState, build_app, migrate_sessions};
use axum::body::Body; use axum::body::Body;
use axum::http::{Request, StatusCode, header}; use axum::http::{Request, StatusCode, header};
use db::users; use db::{audit, users};
use domain::{AuditActor, Email, NewUser, Role}; use domain::{AuditAction, AuditActor, Email, NewUser, Role};
use http_body_util::BodyExt; use http_body_util::BodyExt;
use sqlx::PgPool; use sqlx::PgPool;
use tower::ServiceExt; use tower::ServiceExt;
@@ -290,3 +290,44 @@ async fn add_term_to_missing_vocabulary_is_404(pool: PgPool) {
assert_eq!(resp.status(), StatusCode::NOT_FOUND); assert_eq!(resp.status(), StatusCode::NOT_FOUND);
} }
#[sqlx::test(migrations = "../db/migrations")]
async fn creating_a_vocabulary_writes_an_audit_entry(pool: PgPool) {
migrate_sessions(&db::Db::from_pool(pool.clone()))
.await
.unwrap();
seed_user(&pool, "ed@example.com", "pw-editor-123", Role::Editor).await;
let app = build_app(state(pool.clone()));
let cookie = login(&app, "ed@example.com", "pw-editor-123").await;
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/admin/vocabularies")
.header(header::COOKIE, &cookie)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(r#"{"key":"audit-test"}"#))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let body: serde_json::Value =
serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
let vocab_id: uuid::Uuid = body["id"].as_str().unwrap().parse().unwrap();
let history = audit::history_for(&pool, "vocabulary", vocab_id)
.await
.unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0].action, AuditAction::Created);
assert!(
matches!(history[0].actor, AuditActor::User(_)),
"expected actor to be a user"
);
}
+24 -3
View File
@@ -1,16 +1,25 @@
//! Authority records (person / organisation / place). //! Authority records (person / organisation / place).
use domain::{Authority, AuthorityId, AuthorityKind, AuthorityRef, LocalizedLabel, NewAuthority}; use domain::{
AuditAction, AuditActor, Authority, AuthorityId, AuthorityKind, AuthorityRef, LocalizedLabel,
NewAuditEvent, NewAuthority,
};
use sqlx::Row; use sqlx::Row;
use crate::audit;
const AUTHORITY_ENTITY_TYPE: &str = "authority";
/// Labels aggregated per row as JSON, to read an authority and its labels in one query. /// Labels aggregated per row as JSON, to read an authority and its labels in one query.
const LABELS_JSON: &str = "COALESCE(json_agg(json_build_object('lang', al.lang, 'label', al.label) \ const LABELS_JSON: &str = "COALESCE(json_agg(json_build_object('lang', al.lang, 'label', al.label) \
ORDER BY al.lang) FILTER (WHERE al.authority_id IS NOT NULL), '[]'::json)"; ORDER BY al.lang) FILTER (WHERE al.authority_id IS NOT NULL), '[]'::json)";
/// Insert an authority and its labels. Multiple statements — pass a transaction /// Insert an authority and its labels, then record a `created` audit entry. Multiple
/// connection (`&mut *tx`) for atomicity. /// statements — pass a transaction connection (`&mut *tx`) so everything commits
/// atomically.
pub async fn create_authority( pub async fn create_authority(
conn: &mut sqlx::PgConnection, conn: &mut sqlx::PgConnection,
actor: AuditActor,
new: &NewAuthority, new: &NewAuthority,
) -> Result<AuthorityId, sqlx::Error> { ) -> Result<AuthorityId, sqlx::Error> {
let id = AuthorityId::new(); let id = AuthorityId::new();
@@ -31,6 +40,18 @@ pub async fn create_authority(
.await?; .await?;
} }
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Created,
entity_type: AUTHORITY_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(id) Ok(id)
} }
+4 -3
View File
@@ -17,10 +17,11 @@ pub struct Db {
} }
impl Db { impl Db {
/// Connect to the database at `database_url`, opening a connection pool. /// Connect to the database at `database_url`, opening a connection pool with at most
pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> { /// `max_connections` connections.
pub async fn connect(database_url: &str, max_connections: u32) -> Result<Self, sqlx::Error> {
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
.max_connections(5) .max_connections(max_connections)
.connect(database_url) .connect(database_url)
.await?; .await?;
+8 -2
View File
@@ -5,7 +5,9 @@
//! populated by the organization or a later import. The inventory-minimum fields //! populated by the organization or a later import. The inventory-minimum fields
//! (object number, name, location, …) live in the typed object core, not here. //! (object number, name, location, …) live in the typed object core, not here.
use domain::{AuthorityKind, FieldType, LocalizedLabel, NewFieldDefinition, VocabularyId}; use domain::{
AuditActor, AuthorityKind, FieldType, LocalizedLabel, NewFieldDefinition, VocabularyId,
};
use crate::{fields, vocab}; use crate::{fields, vocab};
@@ -119,7 +121,11 @@ async fn ensure_vocabulary(
if let Some(existing) = vocab::vocabulary_by_key(&mut *conn, key).await? { if let Some(existing) = vocab::vocabulary_by_key(&mut *conn, key).await? {
Ok(existing.id) Ok(existing.id)
} else { } else {
Ok(vocab::create_vocabulary(&mut *conn, key).await?.id) Ok(
vocab::create_vocabulary(&mut *conn, AuditActor::System, key)
.await?
.id,
)
} }
} }
+49 -10
View File
@@ -1,25 +1,47 @@
//! Controlled vocabularies and terms. //! Controlled vocabularies and terms.
use domain::{LocalizedLabel, NewTerm, Term, TermId, TermRef, Vocabulary, VocabularyId}; use domain::{
AuditAction, AuditActor, LocalizedLabel, NewAuditEvent, NewTerm, Term, TermId, TermRef,
Vocabulary, VocabularyId,
};
use sqlx::Row; use sqlx::Row;
use crate::audit;
const VOCABULARY_ENTITY_TYPE: &str = "vocabulary";
const TERM_ENTITY_TYPE: &str = "term";
/// Labels aggregated per row as JSON, to read a term and its labels in one query. /// Labels aggregated per row as JSON, to read a term and its labels in one query.
const LABELS_JSON: &str = "COALESCE(json_agg(json_build_object('lang', tl.lang, 'label', tl.label) \ const LABELS_JSON: &str = "COALESCE(json_agg(json_build_object('lang', tl.lang, 'label', tl.label) \
ORDER BY tl.lang) FILTER (WHERE tl.term_id IS NOT NULL), '[]'::json)"; ORDER BY tl.lang) FILTER (WHERE tl.term_id IS NOT NULL), '[]'::json)";
/// Create a vocabulary with the given key. /// Create a vocabulary with the given key and record a `created` audit entry, both on
pub async fn create_vocabulary<'e, E>(executor: E, key: &str) -> Result<Vocabulary, sqlx::Error> /// `conn` (pass a transaction connection `&mut *tx` so they commit atomically).
where pub async fn create_vocabulary(
E: sqlx::PgExecutor<'e>, conn: &mut sqlx::PgConnection,
{ actor: AuditActor,
key: &str,
) -> Result<Vocabulary, sqlx::Error> {
let id = VocabularyId::new(); let id = VocabularyId::new();
sqlx::query("INSERT INTO vocabulary (id, key) VALUES ($1, $2)") sqlx::query("INSERT INTO vocabulary (id, key) VALUES ($1, $2)")
.bind(id.to_uuid()) .bind(id.to_uuid())
.bind(key) .bind(key)
.execute(executor) .execute(&mut *conn)
.await?; .await?;
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Created,
entity_type: VOCABULARY_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(Vocabulary { Ok(Vocabulary {
id, id,
key: key.to_owned(), key: key.to_owned(),
@@ -54,9 +76,14 @@ where
row.map(map_vocabulary).transpose() row.map(map_vocabulary).transpose()
} }
/// Insert a term and its labels. Multiple statements — pass a transaction /// Insert a term and its labels, then record a `created` audit entry. Multiple
/// connection (`&mut *tx`) so the term and its labels commit atomically. /// statements — pass a transaction connection (`&mut *tx`) so everything commits
pub async fn add_term(conn: &mut sqlx::PgConnection, new: &NewTerm) -> Result<TermId, sqlx::Error> { /// atomically.
pub async fn add_term(
conn: &mut sqlx::PgConnection,
actor: AuditActor,
new: &NewTerm,
) -> Result<TermId, sqlx::Error> {
let id = TermId::new(); let id = TermId::new();
sqlx::query("INSERT INTO term (id, vocabulary_id, external_uri) VALUES ($1, $2, $3)") sqlx::query("INSERT INTO term (id, vocabulary_id, external_uri) VALUES ($1, $2, $3)")
@@ -75,6 +102,18 @@ pub async fn add_term(conn: &mut sqlx::PgConnection, new: &NewTerm) -> Result<Te
.await?; .await?;
} }
audit::record(
&mut *conn,
&NewAuditEvent {
actor,
action: AuditAction::Created,
entity_type: TERM_ENTITY_TYPE.to_owned(),
entity_id: id.to_uuid(),
changes: Vec::new(),
},
)
.await?;
Ok(id) Ok(id)
} }
+12 -6
View File
@@ -1,5 +1,5 @@
use db::{Db, authority}; use db::{Db, authority};
use domain::{AuthorityKind, LocalizedLabel, NewAuthority}; use domain::{AuditActor, AuthorityKind, LocalizedLabel, NewAuthority};
use sqlx::PgPool; use sqlx::PgPool;
fn new_person(name_sv: &str, name_en: &str) -> NewAuthority { fn new_person(name_sv: &str, name_en: &str) -> NewAuthority {
@@ -24,9 +24,13 @@ async fn authority_round_trips_with_labels(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let id = authority::create_authority(&mut tx, &new_person("Carl Larsson", "Carl Larsson")) let id = authority::create_authority(
.await &mut tx,
.unwrap(); AuditActor::System,
&new_person("Carl Larsson", "Carl Larsson"),
)
.await
.unwrap();
tx.commit().await.unwrap(); tx.commit().await.unwrap();
let got = authority::authority_by_id(db.pool(), id) let got = authority::authority_by_id(db.pool(), id)
@@ -47,11 +51,12 @@ async fn list_by_kind_filters(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
authority::create_authority(&mut tx, &new_person("A", "A")) authority::create_authority(&mut tx, AuditActor::System, &new_person("A", "A"))
.await .await
.unwrap(); .unwrap();
authority::create_authority( authority::create_authority(
&mut tx, &mut tx,
AuditActor::System,
&NewAuthority { &NewAuthority {
kind: AuthorityKind::Place, kind: AuthorityKind::Place,
external_uri: None, external_uri: None,
@@ -83,7 +88,7 @@ async fn resolve_authority_returns_kind(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let id = authority::create_authority(&mut tx, &new_person("X", "X")) let id = authority::create_authority(&mut tx, AuditActor::System, &new_person("X", "X"))
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap(); tx.commit().await.unwrap();
@@ -108,6 +113,7 @@ async fn authority_with_no_labels_round_trips_empty(pool: PgPool) {
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let id = authority::create_authority( let id = authority::create_authority(
&mut tx, &mut tx,
AuditActor::System,
&NewAuthority { &NewAuthority {
kind: AuthorityKind::Organisation, kind: AuthorityKind::Organisation,
external_uri: None, external_uri: None,
+4 -2
View File
@@ -1,5 +1,5 @@
use db::{Db, fields, vocab}; use db::{Db, fields, vocab};
use domain::{AuthorityKind, FieldType, LocalizedLabel, NewFieldDefinition}; use domain::{AuditActor, AuthorityKind, FieldType, LocalizedLabel, NewFieldDefinition};
use sqlx::PgPool; use sqlx::PgPool;
fn labels() -> Vec<LocalizedLabel> { fn labels() -> Vec<LocalizedLabel> {
@@ -52,9 +52,11 @@ async fn text_field_round_trips(pool: PgPool) {
#[sqlx::test] #[sqlx::test]
async fn term_and_authority_fields_round_trip_their_binding(pool: PgPool) { async fn term_and_authority_fields_round_trip_their_binding(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let material = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let material = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
fields::create_field_definition( fields::create_field_definition(
+12 -3
View File
@@ -95,9 +95,12 @@ async fn sets_scalar_fields_and_audits(pool: PgPool) {
async fn term_field_must_resolve_in_its_vocabulary(pool: PgPool) { async fn term_field_must_resolve_in_its_vocabulary(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let id = setup_object(&db).await; let id = setup_object(&db).await;
let material = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let material = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
define( define(
&db, &db,
"material", "material",
@@ -110,6 +113,7 @@ async fn term_field_must_resolve_in_its_vocabulary(pool: PgPool) {
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let wood = vocab::add_term( let wood = vocab::add_term(
&mut tx, &mut tx,
AuditActor::System,
&domain::NewTerm { &domain::NewTerm {
vocabulary_id: material.id, vocabulary_id: material.id,
external_uri: None, external_uri: None,
@@ -180,6 +184,7 @@ async fn authority_field_enforces_kind(pool: PgPool) {
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let person = db::authority::create_authority( let person = db::authority::create_authority(
&mut tx, &mut tx,
AuditActor::System,
&domain::NewAuthority { &domain::NewAuthority {
kind: domain::AuthorityKind::Person, kind: domain::AuthorityKind::Person,
external_uri: None, external_uri: None,
@@ -190,6 +195,7 @@ async fn authority_field_enforces_kind(pool: PgPool) {
.unwrap(); .unwrap();
let place = db::authority::create_authority( let place = db::authority::create_authority(
&mut tx, &mut tx,
AuditActor::System,
&domain::NewAuthority { &domain::NewAuthority {
kind: domain::AuthorityKind::Place, kind: domain::AuthorityKind::Place,
external_uri: None, external_uri: None,
@@ -219,12 +225,14 @@ async fn authority_field_enforces_kind(pool: PgPool) {
async fn term_from_wrong_vocabulary_is_rejected(pool: PgPool) { async fn term_from_wrong_vocabulary_is_rejected(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let id = setup_object(&db).await; let id = setup_object(&db).await;
let material = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let material = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
let technique = vocab::create_vocabulary(db.pool(), "technique") let technique = vocab::create_vocabulary(&mut tx, AuditActor::System, "technique")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
define( define(
&db, &db,
"material", "material",
@@ -238,6 +246,7 @@ async fn term_from_wrong_vocabulary_is_rejected(pool: PgPool) {
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let other = vocab::add_term( let other = vocab::add_term(
&mut tx, &mut tx,
AuditActor::System,
&domain::NewTerm { &domain::NewTerm {
vocabulary_id: technique.id, vocabulary_id: technique.id,
external_uri: None, external_uri: None,
+23 -8
View File
@@ -1,13 +1,15 @@
use db::{Db, vocab}; use db::{Db, vocab};
use domain::{LocalizedLabel, NewTerm}; use domain::{AuditActor, LocalizedLabel, NewTerm};
use sqlx::PgPool; use sqlx::PgPool;
#[sqlx::test] #[sqlx::test]
async fn vocabulary_create_and_lookup(pool: PgPool) { async fn vocabulary_create_and_lookup(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let v = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let v = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
let found = vocab::vocabulary_by_key(db.pool(), "material") let found = vocab::vocabulary_by_key(db.pool(), "material")
.await .await
@@ -27,13 +29,16 @@ async fn vocabulary_create_and_lookup(pool: PgPool) {
#[sqlx::test] #[sqlx::test]
async fn term_with_multilingual_labels_round_trips(pool: PgPool) { async fn term_with_multilingual_labels_round_trips(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let v = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let v = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let term_id = vocab::add_term( let term_id = vocab::add_term(
&mut tx, &mut tx,
AuditActor::System,
&NewTerm { &NewTerm {
vocabulary_id: v.id, vocabulary_id: v.id,
external_uri: Some("http://vocab.getty.edu/aat/300011914".into()), external_uri: Some("http://vocab.getty.edu/aat/300011914".into()),
@@ -76,13 +81,16 @@ async fn term_with_multilingual_labels_round_trips(pool: PgPool) {
#[sqlx::test] #[sqlx::test]
async fn term_with_no_labels_round_trips_empty(pool: PgPool) { async fn term_with_no_labels_round_trips_empty(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let v = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let v = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let term_id = vocab::add_term( let term_id = vocab::add_term(
&mut tx, &mut tx,
AuditActor::System,
&NewTerm { &NewTerm {
vocabulary_id: v.id, vocabulary_id: v.id,
external_uri: None, external_uri: None,
@@ -103,10 +111,14 @@ async fn term_with_no_labels_round_trips_empty(pool: PgPool) {
#[sqlx::test] #[sqlx::test]
async fn duplicate_vocabulary_key_is_rejected(pool: PgPool) { async fn duplicate_vocabulary_key_is_rejected(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
let err = vocab::create_vocabulary(db.pool(), "material") tx.commit().await.unwrap();
let mut tx = db.pool().begin().await.unwrap();
let err = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap_err(); .unwrap_err();
assert!( assert!(
@@ -118,16 +130,19 @@ async fn duplicate_vocabulary_key_is_rejected(pool: PgPool) {
#[sqlx::test] #[sqlx::test]
async fn resolve_term_checks_vocabulary_membership(pool: PgPool) { async fn resolve_term_checks_vocabulary_membership(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
let material = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let material = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
let technique = vocab::create_vocabulary(db.pool(), "technique") let technique = vocab::create_vocabulary(&mut tx, AuditActor::System, "technique")
.await .await
.unwrap(); .unwrap();
tx.commit().await.unwrap();
let mut tx = db.pool().begin().await.unwrap(); let mut tx = db.pool().begin().await.unwrap();
let term_id = vocab::add_term( let term_id = vocab::add_term(
&mut tx, &mut tx,
AuditActor::System,
&NewTerm { &NewTerm {
vocabulary_id: material.id, vocabulary_id: material.id,
external_uri: None, external_uri: None,
+4 -3
View File
@@ -23,14 +23,15 @@ async fn reindex_resolves_term_labels_and_finds_by_label(pool: PgPool) {
let db = Db::from_pool(pool); let db = Db::from_pool(pool);
// a material vocabulary with a "wood" term // a material vocabulary with a "wood" term
let material = vocab::create_vocabulary(db.pool(), "material") let mut tx = db.pool().begin().await.unwrap();
let material = vocab::create_vocabulary(&mut tx, AuditActor::System, "material")
.await .await
.unwrap(); .unwrap();
let mut tx = db.pool().begin().await.unwrap();
let wood = vocab::add_term( let wood = vocab::add_term(
&mut tx, &mut tx,
AuditActor::System,
&NewTerm { &NewTerm {
vocabulary_id: material.id, vocabulary_id: material.id,
external_uri: None, external_uri: None,
+8
View File
@@ -42,4 +42,12 @@ pub struct Config {
/// Meilisearch index name for catalogue objects. /// Meilisearch index name for catalogue objects.
#[arg(long = "meili-index", env = "MEILI_INDEX", default_value = "objects")] #[arg(long = "meili-index", env = "MEILI_INDEX", default_value = "objects")]
pub meili_index: String, pub meili_index: String,
/// Maximum size of the PostgreSQL connection pool.
#[arg(
long = "db-max-connections",
env = "DB_MAX_CONNECTIONS",
default_value_t = 5
)]
pub db_max_connections: u32,
} }
+32 -2
View File
@@ -15,7 +15,7 @@ use tokio::net::TcpListener;
/// Connect dependencies from `config` and serve until shutdown. /// Connect dependencies from `config` and serve until shutdown.
pub async fn run(config: Config) -> anyhow::Result<()> { pub async fn run(config: Config) -> anyhow::Result<()> {
let db = Db::connect(&config.database_url) let db = Db::connect(&config.database_url, config.db_max_connections)
.await .await
.context("connecting to the database")?; .context("connecting to the database")?;
@@ -64,6 +64,34 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
serve(listener, state).await serve(listener, state).await
} }
/// Resolves when the process receives SIGINT (Ctrl-C) or SIGTERM, so the server can
/// drain in-flight requests before exiting.
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("install Ctrl-C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
tracing::info!("shutdown signal received; draining");
}
/// Serve the API on an already-bound listener (used by `run` and tests). /// Serve the API on an already-bound listener (used by `run` and tests).
pub async fn serve(listener: TcpListener, state: AppState) -> anyhow::Result<()> { pub async fn serve(listener: TcpListener, state: AppState) -> anyhow::Result<()> {
let app = build_app(state); let app = build_app(state);
@@ -72,6 +100,7 @@ pub async fn serve(listener: TcpListener, state: AppState) -> anyhow::Result<()>
let app = app.merge(web_assets::routes()); let app = app.merge(web_assets::routes());
axum::serve(listener, app) axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await .await
.context("running the HTTP server")?; .context("running the HTTP server")?;
@@ -107,7 +136,8 @@ pub async fn create_user(database_url: &str, email: &str, role: Role) -> anyhow:
auth::hash_password(&password).map_err(|err| anyhow::anyhow!("hashing password: {err}"))? auth::hash_password(&password).map_err(|err| anyhow::anyhow!("hashing password: {err}"))?
}; };
let db = Db::connect(database_url) // CLI one-shot: a tiny pool is plenty.
let db = Db::connect(database_url, 2)
.await .await
.context("connecting to the database")?; .context("connecting to the database")?;
+1 -1
View File
@@ -9,7 +9,7 @@ use tokio::net::TcpListener;
async fn serves_health_live_over_tcp() { async fn serves_health_live_over_tcp() {
let database_url = let database_url =
std::env::var("DATABASE_URL").expect("DATABASE_URL must be set for this test"); std::env::var("DATABASE_URL").expect("DATABASE_URL must be set for this test");
let db = Db::connect(&database_url) let db = Db::connect(&database_url, 2)
.await .await
.expect("connect to database"); .expect("connect to database");
let state = AppState { let state = AppState {