1cdfa21259
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
413 lines
14 KiB
Rust
413 lines
14 KiB
Rust
//! Full-text search over catalogue objects, backed by Meilisearch.
|
|
//!
|
|
//! This crate provides the search *capability* plus a `reindex_all` rebuild path.
|
|
//! On-write index sync (calling `index_object`/`remove_object` after a catalogue
|
|
//! mutation commits) is wired at the API/service layer (Plan 7+). Meilisearch is not
|
|
//! transactional with Postgres, so the index is eventually consistent; `reindex_all`
|
|
//! is the recovery path.
|
|
|
|
use db::Db;
|
|
use domain::{CatalogueObject, ObjectId};
|
|
use meilisearch_sdk::search::Selectors;
|
|
use meilisearch_sdk::tasks::Task;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
/// Errors from the search subsystem.
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum SearchError {
|
|
#[error(transparent)]
|
|
Meili(#[from] meilisearch_sdk::errors::Error),
|
|
|
|
#[error(transparent)]
|
|
Db(#[from] sqlx::Error),
|
|
|
|
#[error("invalid object id in index: {0}")]
|
|
BadId(String),
|
|
}
|
|
|
|
/// The indexed shape of a catalogue object.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct SearchDocument {
|
|
pub id: String,
|
|
pub object_number: String,
|
|
pub object_name: String,
|
|
pub brief_description: Option<String>,
|
|
pub current_owner: Option<String>,
|
|
pub recorder: Option<String>,
|
|
pub recording_date: Option<String>,
|
|
/// Filterable: "draft" | "internal" | "public".
|
|
pub visibility: String,
|
|
/// Flexible field values flattened to searchable text.
|
|
pub fields_text: Vec<String>,
|
|
}
|
|
|
|
/// Non-HTML highlight markers. These ASCII control characters cannot occur in
|
|
/// catalogue text, so the frontend can safely split on them to render matches —
|
|
/// no HTML ever crosses the API boundary.
|
|
pub const HL_PRE: &str = "\u{2}";
|
|
pub const HL_POST: &str = "\u{3}";
|
|
|
|
/// One search result: display metadata projected from the index, plus an optional
|
|
/// snippet of matched text with [`HL_PRE`]/[`HL_POST`] markers around the matches.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct SearchHit {
|
|
pub id: String,
|
|
pub object_number: String,
|
|
pub object_name: String,
|
|
pub brief_description: Option<String>,
|
|
pub visibility: String,
|
|
pub recording_date: Option<String>,
|
|
pub snippet: Option<String>,
|
|
}
|
|
|
|
/// A page of search results plus Meilisearch's estimate of the total match count.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct SearchResults {
|
|
pub hits: Vec<SearchHit>,
|
|
pub estimated_total: usize,
|
|
}
|
|
|
|
/// A Meilisearch-backed search client scoped to one index.
|
|
#[derive(Clone)]
|
|
pub struct SearchClient {
|
|
client: meilisearch_sdk::client::Client,
|
|
index_uid: String,
|
|
}
|
|
|
|
/// Turn a completed task into an error if Meilisearch rejected it.
|
|
fn check_task(task: Task) -> Result<(), SearchError> {
|
|
match task {
|
|
Task::Failed { content } => Err(SearchError::Meili(
|
|
meilisearch_sdk::errors::Error::Meilisearch(content.error),
|
|
)),
|
|
_ => Ok(()),
|
|
}
|
|
}
|
|
|
|
impl SearchClient {
|
|
pub fn connect(url: &str, api_key: &str, index_uid: &str) -> Result<Self, SearchError> {
|
|
let client = meilisearch_sdk::client::Client::new(url, Some(api_key))?;
|
|
|
|
Ok(Self {
|
|
client,
|
|
index_uid: index_uid.to_owned(),
|
|
})
|
|
}
|
|
|
|
pub async fn ensure_index(&self) -> Result<(), SearchError> {
|
|
let task = self
|
|
.client
|
|
.create_index(&self.index_uid, Some("id"))
|
|
.await?
|
|
.wait_for_completion(&self.client, None, None)
|
|
.await?;
|
|
|
|
// Tolerate "index already exists"; surface any other task failure.
|
|
if let Task::Failed { content } = &task {
|
|
if content.error.error_code != meilisearch_sdk::errors::ErrorCode::IndexAlreadyExists {
|
|
return Err(SearchError::Meili(
|
|
meilisearch_sdk::errors::Error::Meilisearch(content.error.clone()),
|
|
));
|
|
}
|
|
}
|
|
|
|
// set_filterable_attributes is idempotent on an existing index
|
|
let task = self
|
|
.client
|
|
.index(&self.index_uid)
|
|
.set_filterable_attributes(["visibility"])
|
|
.await?
|
|
.wait_for_completion(&self.client, None, None)
|
|
.await?;
|
|
|
|
check_task(task)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn index_object(&self, doc: &SearchDocument) -> Result<(), SearchError> {
|
|
let task = self
|
|
.client
|
|
.index(&self.index_uid)
|
|
.add_or_replace(std::slice::from_ref(doc), Some("id"))
|
|
.await?
|
|
.wait_for_completion(&self.client, None, None)
|
|
.await?;
|
|
|
|
check_task(task)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn remove_object(&self, id: ObjectId) -> Result<(), SearchError> {
|
|
let task = self
|
|
.client
|
|
.index(&self.index_uid)
|
|
.delete_document(id.to_string())
|
|
.await?
|
|
.wait_for_completion(&self.client, None, None)
|
|
.await?;
|
|
|
|
check_task(task)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn search(&self, query: &str) -> Result<Vec<ObjectId>, SearchError> {
|
|
let index = self.client.index(&self.index_uid);
|
|
|
|
let results = index
|
|
.search()
|
|
.with_query(query)
|
|
.build()
|
|
.execute::<SearchDocument>()
|
|
.await?;
|
|
|
|
results
|
|
.hits
|
|
.into_iter()
|
|
.map(|hit| {
|
|
hit.result
|
|
.id
|
|
.parse::<ObjectId>()
|
|
.map_err(|_| SearchError::BadId(hit.result.id))
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Full-text query returning display-ready hits with highlighted snippets and the
|
|
/// estimated total match count. `visibility`, when set, filters on the indexed
|
|
/// `visibility` attribute. Pagination is offset/limit.
|
|
///
|
|
/// # Preconditions
|
|
///
|
|
/// When `visibility` is `Some`, the value must be one of `"draft"`, `"internal"`, or
|
|
/// `"public"`. The caller owns this validation (the API layer enforces it); this
|
|
/// method `debug_assert!`s the constraint as defense-in-depth.
|
|
pub async fn search_objects(
|
|
&self,
|
|
query: &str,
|
|
visibility: Option<&str>,
|
|
offset: usize,
|
|
limit: usize,
|
|
) -> Result<SearchResults, SearchError> {
|
|
let index = self.client.index(&self.index_uid);
|
|
|
|
let filter = visibility.map(|v| {
|
|
debug_assert!(
|
|
matches!(v, "draft" | "internal" | "public"),
|
|
"visibility filter must be a known value; got {v:?}"
|
|
);
|
|
|
|
format!("visibility = \"{v}\"")
|
|
});
|
|
let highlight: &[&str] = &["object_name", "brief_description", "fields_text"];
|
|
let crop: &[(&str, Option<usize>)] = &[("brief_description", None), ("fields_text", None)];
|
|
|
|
let mut search = index.search();
|
|
search
|
|
.with_query(query)
|
|
.with_offset(offset)
|
|
.with_limit(limit)
|
|
.with_attributes_to_highlight(Selectors::Some(highlight))
|
|
.with_attributes_to_crop(Selectors::Some(crop))
|
|
// ~20 words gives enough catalogue-description context around a match.
|
|
.with_crop_length(20)
|
|
.with_highlight_pre_tag(HL_PRE)
|
|
.with_highlight_post_tag(HL_POST);
|
|
|
|
if let Some(filter) = &filter {
|
|
search.with_filter(filter);
|
|
}
|
|
|
|
let results = search.execute::<SearchDocument>().await?;
|
|
|
|
let hits = results
|
|
.hits
|
|
.into_iter()
|
|
.map(|hit| {
|
|
let snippet = hit.formatted_result.as_ref().and_then(extract_snippet);
|
|
let doc = hit.result;
|
|
|
|
SearchHit {
|
|
id: doc.id,
|
|
object_number: doc.object_number,
|
|
object_name: doc.object_name,
|
|
brief_description: doc.brief_description,
|
|
visibility: doc.visibility,
|
|
recording_date: doc.recording_date,
|
|
snippet,
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
Ok(SearchResults {
|
|
hits,
|
|
// estimated_total_hits is always present for offset/limit pagination;
|
|
// None only under page-based mode, which we don't use.
|
|
estimated_total: results.estimated_total_hits.unwrap_or(0),
|
|
})
|
|
}
|
|
|
|
/// Sync a single object's index entry with the database after a catalogue write
|
|
/// commits: re-project and index it if it still exists, otherwise remove it. This
|
|
/// is the uniform on-write path for create/update/delete/field/visibility changes —
|
|
/// a delete (object gone) removes the entry; everything else re-indexes the current
|
|
/// projection. Best-effort: callers invoke it after the DB transaction commits and
|
|
/// log (not propagate) any error, since `reindex_all` is the recovery path.
|
|
pub async fn sync_object(&self, db: &Db, id: ObjectId) -> Result<(), SearchError> {
|
|
match db::catalog::object_by_id(db.pool(), id).await? {
|
|
Some(object) => {
|
|
let document = build_document(db, &object).await?;
|
|
|
|
self.index_object(&document).await
|
|
}
|
|
None => self.remove_object(id).await,
|
|
}
|
|
}
|
|
|
|
/// Rebuild the whole index from the database (clears then re-adds all objects).
|
|
pub async fn reindex_all(&self, db: &Db) -> Result<(), SearchError> {
|
|
let index = self.client.index(&self.index_uid);
|
|
|
|
let task = index
|
|
.delete_all_documents()
|
|
.await?
|
|
.wait_for_completion(&self.client, None, None)
|
|
.await?;
|
|
|
|
check_task(task)?;
|
|
|
|
let objects = db::catalog::list_objects(db.pool()).await?;
|
|
|
|
let mut docs = Vec::with_capacity(objects.len());
|
|
|
|
for object in &objects {
|
|
docs.push(build_document(db, object).await?);
|
|
}
|
|
|
|
if !docs.is_empty() {
|
|
let task = index
|
|
.add_or_replace(&docs, Some("id"))
|
|
.await?
|
|
.wait_for_completion(&self.client, None, None)
|
|
.await?;
|
|
|
|
check_task(task)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Build a [`SearchDocument`] from a catalogue object, resolving term and authority
|
|
/// references to their human-readable labels so Meilisearch can match on them.
|
|
pub async fn build_document(
|
|
db: &Db,
|
|
object: &CatalogueObject,
|
|
) -> Result<SearchDocument, SearchError> {
|
|
let mut fields_text = Vec::new();
|
|
|
|
if let Some(map) = object.fields.as_object() {
|
|
for (key, value) in map {
|
|
let Some(def) = db::fields::field_definition_by_key(db.pool(), key).await? else {
|
|
// Stale field with no definition — skip.
|
|
continue;
|
|
};
|
|
|
|
match def.field_type {
|
|
domain::FieldType::Text | domain::FieldType::Date => {
|
|
if let Some(s) = value.as_str() {
|
|
fields_text.push(s.to_owned());
|
|
}
|
|
}
|
|
|
|
domain::FieldType::Integer | domain::FieldType::Boolean => {
|
|
fields_text.push(value.to_string());
|
|
}
|
|
|
|
domain::FieldType::LocalizedText => {
|
|
if let Some(obj) = value.as_object() {
|
|
for v in obj.values() {
|
|
if let Some(s) = v.as_str() {
|
|
fields_text.push(s.to_owned());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
domain::FieldType::Term { .. } => {
|
|
if let Some(term_id) = value
|
|
.as_str()
|
|
.and_then(|s| s.parse::<domain::TermId>().ok())
|
|
{
|
|
if let Some(term) = db::vocab::term_by_id(db.pool(), term_id).await? {
|
|
fields_text.extend(term.labels.into_iter().map(|l| l.label));
|
|
}
|
|
}
|
|
}
|
|
|
|
domain::FieldType::Authority { .. } => {
|
|
if let Some(authority_id) = value
|
|
.as_str()
|
|
.and_then(|s| s.parse::<domain::AuthorityId>().ok())
|
|
{
|
|
if let Some(authority) =
|
|
db::authority::authority_by_id(db.pool(), authority_id).await?
|
|
{
|
|
fields_text.extend(authority.labels.into_iter().map(|l| l.label));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(SearchDocument {
|
|
id: object.id.to_string(),
|
|
object_number: object.object_number.clone(),
|
|
object_name: object.object_name.clone(),
|
|
brief_description: object.brief_description.clone(),
|
|
current_owner: object.current_owner.clone(),
|
|
recorder: object.recorder.clone(),
|
|
recording_date: object.recording_date.map(|d| d.to_string()),
|
|
visibility: object.visibility.as_str().to_owned(),
|
|
fields_text,
|
|
})
|
|
}
|
|
|
|
/// Pick the best snippet from Meilisearch's `_formatted` map: prefer a highlighted
|
|
/// `brief_description`, then a highlighted `fields_text` entry, then `object_name`;
|
|
/// fall back to an unhighlighted `brief_description` so a hit still shows context.
|
|
fn extract_snippet(formatted: &serde_json::Map<String, serde_json::Value>) -> Option<String> {
|
|
let has_mark = |s: &str| s.contains(HL_PRE);
|
|
|
|
if let Some(serde_json::Value::String(s)) = formatted.get("brief_description") {
|
|
if has_mark(s) {
|
|
return Some(s.clone());
|
|
}
|
|
}
|
|
|
|
if let Some(serde_json::Value::Array(items)) = formatted.get("fields_text") {
|
|
for item in items {
|
|
if let Some(s) = item.as_str() {
|
|
if has_mark(s) {
|
|
return Some(s.to_owned());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(serde_json::Value::String(s)) = formatted.get("object_name") {
|
|
if has_mark(s) {
|
|
return Some(s.clone());
|
|
}
|
|
}
|
|
|
|
if let Some(serde_json::Value::String(s)) = formatted.get("brief_description") {
|
|
return Some(s.clone());
|
|
}
|
|
|
|
None
|
|
}
|