//! Full-text search over catalogue objects, backed by Meilisearch. //! //! This crate provides the search *capability* plus a `reindex_all` rebuild path. //! On-write index sync (calling `index_object`/`remove_object` after a catalogue //! mutation commits) is wired at the API/service layer (Plan 7+). Meilisearch is not //! transactional with Postgres, so the index is eventually consistent; `reindex_all` //! is the recovery path. use db::Db; use domain::{CatalogueObject, ObjectId}; use meilisearch_sdk::tasks::Task; use serde::{Deserialize, Serialize}; /// Errors from the search subsystem. #[derive(Debug, thiserror::Error)] pub enum SearchError { #[error(transparent)] Meili(#[from] meilisearch_sdk::errors::Error), #[error(transparent)] Db(#[from] sqlx::Error), #[error("invalid object id in index: {0}")] BadId(String), } /// The indexed shape of a catalogue object. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SearchDocument { pub id: String, pub object_number: String, pub object_name: String, pub brief_description: Option, pub current_owner: Option, pub recorder: Option, /// Filterable: "draft" | "internal" | "public". pub visibility: String, /// Flexible field values flattened to searchable text. pub fields_text: Vec, } /// A Meilisearch-backed search client scoped to one index. pub struct SearchClient { client: meilisearch_sdk::client::Client, index_uid: String, } /// Turn a completed task into an error if Meilisearch rejected it. fn check_task(task: Task) -> Result<(), SearchError> { match task { Task::Failed { content } => Err(SearchError::Meili( meilisearch_sdk::errors::Error::Meilisearch(content.error), )), _ => Ok(()), } } impl SearchClient { pub fn connect(url: &str, api_key: &str, index_uid: &str) -> Result { let client = meilisearch_sdk::client::Client::new(url, Some(api_key))?; Ok(Self { client, index_uid: index_uid.to_owned(), }) } pub async fn ensure_index(&self) -> Result<(), SearchError> { let task = self .client .create_index(&self.index_uid, Some("id")) .await? .wait_for_completion(&self.client, None, None) .await?; // Tolerate "index already exists"; surface any other task failure. if let Task::Failed { content } = &task { if content.error.error_code != meilisearch_sdk::errors::ErrorCode::IndexAlreadyExists { return Err(SearchError::Meili( meilisearch_sdk::errors::Error::Meilisearch(content.error.clone()), )); } } // set_filterable_attributes is idempotent on an existing index let task = self .client .index(&self.index_uid) .set_filterable_attributes(["visibility"]) .await? .wait_for_completion(&self.client, None, None) .await?; check_task(task)?; Ok(()) } pub async fn index_object(&self, doc: &SearchDocument) -> Result<(), SearchError> { let task = self .client .index(&self.index_uid) .add_or_replace(std::slice::from_ref(doc), Some("id")) .await? .wait_for_completion(&self.client, None, None) .await?; check_task(task)?; Ok(()) } pub async fn remove_object(&self, id: ObjectId) -> Result<(), SearchError> { let task = self .client .index(&self.index_uid) .delete_document(id.to_string()) .await? .wait_for_completion(&self.client, None, None) .await?; check_task(task)?; Ok(()) } pub async fn search(&self, query: &str) -> Result, SearchError> { let index = self.client.index(&self.index_uid); let results = index .search() .with_query(query) .build() .execute::() .await?; results .hits .into_iter() .map(|hit| { hit.result .id .parse::() .map_err(|_| SearchError::BadId(hit.result.id)) }) .collect() } /// Rebuild the whole index from the database (clears then re-adds all objects). pub async fn reindex_all(&self, db: &Db) -> Result<(), SearchError> { let index = self.client.index(&self.index_uid); let task = index .delete_all_documents() .await? .wait_for_completion(&self.client, None, None) .await?; check_task(task)?; let objects = db::catalog::list_objects(db.pool()).await?; let mut docs = Vec::with_capacity(objects.len()); for object in &objects { docs.push(build_document(db, object).await?); } if !docs.is_empty() { let task = index .add_or_replace(&docs, Some("id")) .await? .wait_for_completion(&self.client, None, None) .await?; check_task(task)?; } Ok(()) } } /// Build a [`SearchDocument`] from a catalogue object, resolving term and authority /// references to their human-readable labels so Meilisearch can match on them. pub async fn build_document( db: &Db, object: &CatalogueObject, ) -> Result { let mut fields_text = Vec::new(); if let Some(map) = object.fields.as_object() { for (key, value) in map { let Some(def) = db::fields::field_definition_by_key(db.pool(), key).await? else { // Stale field with no definition — skip. continue; }; match def.field_type { domain::FieldType::Text | domain::FieldType::Date => { if let Some(s) = value.as_str() { fields_text.push(s.to_owned()); } } domain::FieldType::Integer | domain::FieldType::Boolean => { fields_text.push(value.to_string()); } domain::FieldType::LocalizedText => { if let Some(obj) = value.as_object() { for v in obj.values() { if let Some(s) = v.as_str() { fields_text.push(s.to_owned()); } } } } domain::FieldType::Term { .. } => { if let Some(term_id) = value .as_str() .and_then(|s| s.parse::().ok()) { if let Some(term) = db::vocab::term_by_id(db.pool(), term_id).await? { fields_text.extend(term.labels.into_iter().map(|l| l.label)); } } } domain::FieldType::Authority { .. } => { if let Some(authority_id) = value .as_str() .and_then(|s| s.parse::().ok()) { if let Some(authority) = db::authority::authority_by_id(db.pool(), authority_id).await? { fields_text.extend(authority.labels.into_iter().map(|l| l.label)); } } } } } } Ok(SearchDocument { id: object.id.to_string(), object_number: object.object_number.clone(), object_name: object.object_name.clone(), brief_description: object.brief_description.clone(), current_owner: object.current_owner.clone(), recorder: object.recorder.clone(), visibility: object.visibility.as_str().to_owned(), fields_text, }) }