From d15afda9b241b1fb9a860fd77ef933e06878399f Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Tue, 2 Jun 2026 23:25:43 +0200 Subject: [PATCH] feat(api): on-write search reindex after catalogue writes (#17) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire best-effort Meilisearch index sync into the admin write paths (create/update/delete/set_fields/set_visibility). Adds SearchClient::sync_object (reindex if the object exists, remove if gone — one uniform path), an optional AppState.search client, and a reindex helper that logs failures via tracing without failing the committed write. Server gains MEILI_URL/MEILI_MASTER_KEY/MEILI_INDEX config; search stays disabled (no-op) when unset. reindex_all remains the recovery path. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 4 + crates/api/Cargo.toml | 3 + crates/api/src/admin.rs | 2 + crates/api/src/admin_objects.rs | 8 ++ crates/api/src/lib.rs | 17 ++++ crates/api/tests/admin.rs | 1 + crates/api/tests/admin_catalog.rs | 1 + crates/api/tests/admin_objects.rs | 1 + crates/api/tests/health.rs | 1 + crates/api/tests/public.rs | 1 + crates/api/tests/reindex.rs | 137 ++++++++++++++++++++++++++++++ crates/search/src/lib.rs | 18 ++++ crates/search/tests/sync.rs | 64 ++++++++++++++ crates/server/Cargo.toml | 1 + crates/server/src/config.rs | 15 ++++ crates/server/src/lib.rs | 24 ++++++ crates/server/tests/serve.rs | 1 + 17 files changed, 299 insertions(+) create mode 100644 crates/api/tests/reindex.rs create mode 100644 crates/search/tests/sync.rs diff --git a/Cargo.lock b/Cargo.lock index 2a9fa4e..f117daf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,7 @@ dependencies = [ "db", "domain", "http-body-util", + "search", "serde", "serde_json", "sqlx", @@ -90,7 +91,9 @@ dependencies = [ "tower", "tower-sessions", "tower-sessions-sqlx-store", + "tracing", "utoipa", + "uuid", ] [[package]] @@ -1969,6 +1972,7 @@ dependencies = [ "domain", "reqwest", "rpassword", + "search", "serde_json", "sqlx", "temp-env", diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index 2c107c4..5caf78f 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -13,12 +13,15 @@ time.workspace = true tower-sessions.workspace = true tower-sessions-sqlx-store.workspace = true sqlx.workspace = true +tracing.workspace = true auth = { path = "../auth" } db = { path = "../db" } domain = { path = "../domain" } +search = { path = "../search" } [dev-dependencies] tokio.workspace = true tower.workspace = true http-body-util.workspace = true serde_json.workspace = true +uuid.workspace = true diff --git a/crates/api/src/admin.rs b/crates/api/src/admin.rs index 3b55eb4..ff3a4e8 100644 --- a/crates/api/src/admin.rs +++ b/crates/api/src/admin.rs @@ -160,6 +160,8 @@ pub(crate) async fn set_visibility( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + crate::reindex(&state, object_id).await; + Ok(StatusCode::NO_CONTENT) } Err(db::catalog::VisibilityError::ObjectNotFound) => Err(StatusCode::NOT_FOUND), diff --git a/crates/api/src/admin_objects.rs b/crates/api/src/admin_objects.rs index 42036ef..9685d93 100644 --- a/crates/api/src/admin_objects.rs +++ b/crates/api/src/admin_objects.rs @@ -234,6 +234,8 @@ pub(crate) async fn create_object( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + crate::reindex(&state, id).await; + Ok(( StatusCode::CREATED, Json(CreatedObject { id: id.to_string() }), @@ -299,6 +301,8 @@ pub(crate) async fn update_object( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; if existed { + crate::reindex(&state, object_id).await; + Ok(StatusCode::NO_CONTENT) } else { Err(StatusCode::NOT_FOUND) @@ -339,6 +343,8 @@ pub(crate) async fn delete_object( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; if existed { + crate::reindex(&state, object_id).await; + Ok(StatusCode::NO_CONTENT) } else { Err(StatusCode::NOT_FOUND) @@ -443,6 +449,8 @@ pub(crate) async fn set_fields( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + crate::reindex(&state, object_id).await; + Ok(StatusCode::NO_CONTENT) } Err(db::catalog::FieldError::ObjectNotFound) => Err(StatusCode::NOT_FOUND), diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 5ccde10..f5f3bfa 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -26,6 +26,23 @@ pub struct AppState { /// Whether the session cookie carries the `Secure` attribute (default true; /// disable only for plain-HTTP self-hosting). pub cookie_secure: bool, + /// Search client for on-write index sync. `None` disables indexing (search is a + /// best-effort feature; absent when Meilisearch is not configured). + pub search: Option, +} + +/// Best-effort: keep the search index in step with a catalogue write that has already +/// committed. Re-projects and indexes the object, or removes it if it no longer exists. +/// Never fails the request — a search outage must not undo a committed write, and +/// `reindex_all` is the recovery path. A no-op when search is not configured. +pub(crate) async fn reindex(state: &AppState, id: domain::ObjectId) { + let Some(search) = &state.search else { + return; + }; + + if let Err(err) = search.sync_object(&state.db, id).await { + tracing::error!(?err, object_id = %id, "search reindex after write failed"); + } } /// Build the application router from shared state. diff --git a/crates/api/tests/admin.rs b/crates/api/tests/admin.rs index 93efd75..9aeb7e5 100644 --- a/crates/api/tests/admin.rs +++ b/crates/api/tests/admin.rs @@ -12,6 +12,7 @@ fn state(pool: PgPool) -> AppState { db: db::Db::from_pool(pool), app_name: "Test".into(), cookie_secure: false, + search: None, } } diff --git a/crates/api/tests/admin_catalog.rs b/crates/api/tests/admin_catalog.rs index afaaa3c..9ef9e8a 100644 --- a/crates/api/tests/admin_catalog.rs +++ b/crates/api/tests/admin_catalog.rs @@ -12,6 +12,7 @@ fn state(pool: PgPool) -> AppState { db: db::Db::from_pool(pool), app_name: "Test".into(), cookie_secure: false, + search: None, } } diff --git a/crates/api/tests/admin_objects.rs b/crates/api/tests/admin_objects.rs index f836c64..441ecb1 100644 --- a/crates/api/tests/admin_objects.rs +++ b/crates/api/tests/admin_objects.rs @@ -15,6 +15,7 @@ fn state(pool: PgPool) -> AppState { db: db::Db::from_pool(pool), app_name: "Test".into(), cookie_secure: false, + search: None, } } diff --git a/crates/api/tests/health.rs b/crates/api/tests/health.rs index 46cb9a7..806ad66 100644 --- a/crates/api/tests/health.rs +++ b/crates/api/tests/health.rs @@ -10,6 +10,7 @@ fn state(pool: PgPool, app_name: &str) -> AppState { db: db::Db::from_pool(pool), app_name: app_name.to_string(), cookie_secure: false, + search: None, } } diff --git a/crates/api/tests/public.rs b/crates/api/tests/public.rs index d30e91b..b09737c 100644 --- a/crates/api/tests/public.rs +++ b/crates/api/tests/public.rs @@ -12,6 +12,7 @@ fn state(pool: PgPool) -> AppState { db: db::Db::from_pool(pool), app_name: "Test".to_string(), cookie_secure: false, + search: None, } } diff --git a/crates/api/tests/reindex.rs b/crates/api/tests/reindex.rs new file mode 100644 index 0000000..78392ec --- /dev/null +++ b/crates/api/tests/reindex.rs @@ -0,0 +1,137 @@ +use api::{AppState, build_app, migrate_sessions}; +use axum::body::Body; +use axum::http::{Request, StatusCode, header}; +use db::users; +use domain::{AuditActor, Email, NewUser, ObjectId, Role}; +use http_body_util::BodyExt; +use search::SearchClient; +use sqlx::PgPool; +use tower::ServiceExt; + +fn meili() -> (String, String) { + ( + std::env::var("MEILI_URL").expect("MEILI_URL must be set"), + std::env::var("MEILI_MASTER_KEY").expect("MEILI_MASTER_KEY must be set"), + ) +} + +fn unique_index() -> String { + format!("api_reindex_test_{}", uuid::Uuid::new_v4().simple()) +} + +fn state(pool: PgPool, search: SearchClient) -> AppState { + AppState { + db: db::Db::from_pool(pool), + app_name: "Test".into(), + cookie_secure: false, + search: Some(search), + } +} + +async fn seed_user(pool: &PgPool, email: &str, password: &str, role: Role) { + let db = db::Db::from_pool(pool.clone()); + let mut tx = db.pool().begin().await.unwrap(); + + users::create_user( + &mut tx, + AuditActor::System, + &NewUser { + email: Email::parse(email).unwrap(), + password_hash: auth::hash_password(password).unwrap(), + role, + }, + ) + .await + .unwrap(); + + tx.commit().await.unwrap(); +} + +async fn login(app: &axum::Router, email: &str, password: &str) -> String { + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/api/admin/login") + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(format!( + r#"{{"email":"{email}","password":"{password}"}}"# + ))) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + resp.headers() + .get(header::SET_COOKIE) + .unwrap() + .to_str() + .unwrap() + .split(';') + .next() + .unwrap() + .to_owned() +} + +#[sqlx::test(migrations = "../db/migrations")] +async fn admin_writes_sync_the_search_index(pool: PgPool) { + migrate_sessions(&db::Db::from_pool(pool.clone())) + .await + .unwrap(); + + seed_user(&pool, "ed@example.com", "pw-editor-123", Role::Editor).await; + + let (url, key) = meili(); + let search = SearchClient::connect(&url, &key, &unique_index()).unwrap(); + + search.ensure_index().await.unwrap(); + + // a second handle to the same index, used to observe what the handlers indexed + let observer = search.clone(); + + let app = build_app(state(pool.clone(), search)); + let cookie = login(&app, "ed@example.com", "pw-editor-123").await; + + // create via the admin API -> the object is indexed on commit + let create = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/api/admin/objects") + .header(header::COOKIE, &cookie) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from( + r#"{"object_number":"R-1","object_name":"astrolabe","number_of_objects":1,"visibility":"internal"}"#, + )) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(create.status(), StatusCode::CREATED); + + let created: serde_json::Value = + serde_json::from_slice(&create.into_body().collect().await.unwrap().to_bytes()).unwrap(); + let id: ObjectId = created["id"].as_str().unwrap().parse().unwrap(); + + assert_eq!(observer.search("astrolabe").await.unwrap(), vec![id]); + + // delete via the admin API -> the object drops out of the index + let delete = app + .oneshot( + Request::builder() + .method("DELETE") + .uri(format!("/api/admin/objects/{id}")) + .header(header::COOKIE, &cookie) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(delete.status(), StatusCode::NO_CONTENT); + + assert!(observer.search("astrolabe").await.unwrap().is_empty()); +} diff --git a/crates/search/src/lib.rs b/crates/search/src/lib.rs index c1f521c..8dbe066 100644 --- a/crates/search/src/lib.rs +++ b/crates/search/src/lib.rs @@ -40,6 +40,7 @@ pub struct SearchDocument { } /// A Meilisearch-backed search client scoped to one index. +#[derive(Clone)] pub struct SearchClient { client: meilisearch_sdk::client::Client, index_uid: String, @@ -146,6 +147,23 @@ impl SearchClient { .collect() } + /// Sync a single object's index entry with the database after a catalogue write + /// commits: re-project and index it if it still exists, otherwise remove it. This + /// is the uniform on-write path for create/update/delete/field/visibility changes — + /// a delete (object gone) removes the entry; everything else re-indexes the current + /// projection. Best-effort: callers invoke it after the DB transaction commits and + /// log (not propagate) any error, since `reindex_all` is the recovery path. + pub async fn sync_object(&self, db: &Db, id: ObjectId) -> Result<(), SearchError> { + match db::catalog::object_by_id(db.pool(), id).await? { + Some(object) => { + let document = build_document(db, &object).await?; + + self.index_object(&document).await + } + None => self.remove_object(id).await, + } + } + /// Rebuild the whole index from the database (clears then re-adds all objects). pub async fn reindex_all(&self, db: &Db) -> Result<(), SearchError> { let index = self.client.index(&self.index_uid); diff --git a/crates/search/tests/sync.rs b/crates/search/tests/sync.rs new file mode 100644 index 0000000..7af7bd9 --- /dev/null +++ b/crates/search/tests/sync.rs @@ -0,0 +1,64 @@ +use db::{Db, catalog}; +use domain::{AuditActor, ObjectInput, Visibility}; +use search::SearchClient; +use sqlx::PgPool; + +fn meili() -> (String, String) { + ( + std::env::var("MEILI_URL").expect("MEILI_URL must be set"), + std::env::var("MEILI_MASTER_KEY").expect("MEILI_MASTER_KEY must be set"), + ) +} + +fn unique_index() -> String { + format!("sync_test_{}", uuid::Uuid::new_v4().simple()) +} + +fn object(number: &str, name: &str) -> ObjectInput { + ObjectInput { + object_number: number.into(), + object_name: name.into(), + number_of_objects: 1, + brief_description: None, + current_location: None, + current_owner: None, + recorder: None, + recording_date: None, + visibility: Visibility::Draft, + } +} + +#[sqlx::test(migrations = "../db/migrations")] +async fn sync_object_indexes_then_removes(pool: PgPool) { + let db = Db::from_pool(pool); + + let mut tx = db.pool().begin().await.unwrap(); + + let id = catalog::create_object(&mut tx, AuditActor::System, &object("S-1", "lamp")) + .await + .unwrap(); + + tx.commit().await.unwrap(); + + let (url, key) = meili(); + let client = SearchClient::connect(&url, &key, &unique_index()).unwrap(); + + client.ensure_index().await.unwrap(); + + // object exists -> sync indexes it + client.sync_object(&db, id).await.unwrap(); + assert_eq!(client.search("lamp").await.unwrap(), vec![id]); + + // object deleted -> sync removes it from the index + let mut tx = db.pool().begin().await.unwrap(); + + let existed = catalog::delete_object(&mut tx, AuditActor::System, id) + .await + .unwrap(); + assert!(existed); + + tx.commit().await.unwrap(); + + client.sync_object(&db, id).await.unwrap(); + assert!(client.search("lamp").await.unwrap().is_empty()); +} diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 08bb5b3..233045a 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -22,6 +22,7 @@ api = { path = "../api" } auth = { path = "../auth" } db = { path = "../db" } domain = { path = "../domain" } +search = { path = "../search" } rpassword.workspace = true [dev-dependencies] diff --git a/crates/server/src/config.rs b/crates/server/src/config.rs index 458f2be..0182938 100644 --- a/crates/server/src/config.rs +++ b/crates/server/src/config.rs @@ -27,4 +27,19 @@ pub struct Config { default_value_t = true )] pub cookie_secure: bool, + + /// Meilisearch base URL (e.g. `http://localhost:7700`). On-write search indexing + /// is enabled only when both this and `--meili-master-key` are set; otherwise + /// search is disabled (best-effort feature) and `reindex_all` remains the rebuild + /// path. + #[arg(long = "meili-url", env = "MEILI_URL")] + pub meili_url: Option, + + /// Meilisearch API key (master or a scoped key). + #[arg(long = "meili-master-key", env = "MEILI_MASTER_KEY")] + pub meili_master_key: Option, + + /// Meilisearch index name for catalogue objects. + #[arg(long = "meili-index", env = "MEILI_INDEX", default_value = "objects")] + pub meili_index: String, } diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 0d5c3dd..c584062 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -22,10 +22,34 @@ pub async fn run(config: Config) -> anyhow::Result<()> { .await .context("creating the session store")?; + let search = match (&config.meili_url, &config.meili_master_key) { + (Some(url), Some(key)) => { + let client = search::SearchClient::connect(url, key, &config.meili_index) + .context("connecting to Meilisearch")?; + + client + .ensure_index() + .await + .context("ensuring the search index exists")?; + + tracing::info!(index = %config.meili_index, "search indexing enabled"); + + Some(client) + } + _ => { + tracing::warn!( + "MEILI_URL/MEILI_MASTER_KEY not set — search indexing disabled (reindex_all remains the rebuild path)" + ); + + None + } + }; + let state = AppState { db, app_name: config.app_name.clone(), cookie_secure: config.cookie_secure, + search, }; let listener = TcpListener::bind(&config.bind_addr) diff --git a/crates/server/tests/serve.rs b/crates/server/tests/serve.rs index c0c49cd..aa15497 100644 --- a/crates/server/tests/serve.rs +++ b/crates/server/tests/serve.rs @@ -16,6 +16,7 @@ async fn serves_health_live_over_tcp() { db, app_name: "Test".to_string(), cookie_secure: false, + search: None, }; let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();