Coverage for functions \ flipdare \ search \ db \ _app_search.py: 72%
129 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from abc import ABC, abstractmethod
15from collections.abc import Mapping
16from enum import Enum
17from typing import Any, cast
18import typesense
19from typesense.exceptions import ObjectNotFound
20from typesense.types.document import SearchParameters
22from flipdare.app_log import LOG
23from flipdare.constants import IS_DEBUG, MAX_SEARCH_RESULTS_PER_PAGE
24from flipdare.error.app_error import SearchError
25from flipdare.generated.shared.search.search_collections import SearchCollections
26from flipdare.search.doc._search_document import SearchDocument
27from flipdare.search.result.typesense_payload import TypesensePayload
28from flipdare.util.debug_util import stringify_debug
30__all__ = ["AppSearch"]
33class AppSearch[SModel: SearchDocument[Any], T: Enum](ABC):
35 def __init__(
36 self,
37 collection: SearchCollections,
38 client: typesense.Client,
39 model_class: type[SModel],
40 per_page: int = MAX_SEARCH_RESULTS_PER_PAGE,
41 ) -> None:
42 self._collection = collection
43 self._client = client
44 self._model_class = model_class
45 self._per_page = per_page
47 @abstractmethod
48 def get_user(self, uid: str) -> list[SModel] | None: ...
50 @abstractmethod
51 def delete_user(self, uid: str) -> None: ...
53 @abstractmethod
54 def get_user_type(self, uid: str, identifier: str) -> SModel | None: ...
56 @abstractmethod
57 def delete_user_type(self, uid: str, identifier: str, **kwargs: Any) -> SModel | None: ...
59 @property
60 def collection(self) -> SearchCollections:
61 return self._collection
63 @property
64 def client(self) -> typesense.Client:
65 return self._client
67 def count(self) -> int:
68 col_name = self.collection.value
69 try:
70 collection_info = self.client.collections[col_name].retrieve()
71 document_count = collection_info["num_documents"]
72 return int(document_count)
73 except Exception as error:
74 LOG().error(f"Error counting documents in {col_name}: {error}")
75 return 0
77 def create(self, document: SModel) -> SModel:
78 col_name = self.collection.value
79 payload = document.payload()
80 doc_id: str | None = None
82 try:
83 result = self.client.collections[col_name].documents.create(payload)
84 result_data = dict(result)
85 if "id" in result_data:
86 doc_id = str(result_data["id"])
88 except Exception as error:
89 LOG().error(f"Error creating document in {col_name} with payload {payload}: {error}")
90 raise
92 if doc_id is None:
93 LOG().error(f"Failed to add document to {col_name}, no ID returned.")
94 raise ValueError(f"Failed to add document to {col_name}, no ID returned.")
96 return self._model_class.from_payload(str(doc_id), payload)
98 def update(self, doc: SModel, force: bool = False) -> SModel | None:
99 doc_id = doc.doc_id
100 if doc_id is None:
101 raise ValueError(f"Document must have a valid doc_id for update.: {doc} ")
103 col_name = self.collection.value
104 updates = doc.updates()
105 if updates is None:
106 LOG().warning(f"No updates provided for document {doc_id} in {col_name}.")
107 return None
109 original = self.get(doc_id)
110 if original is None:
111 # this should not happen (since we have a doc_id), but just in case
112 # warn but continue
113 LOG().warning(f"Document {doc_id} not found in {col_name}, has doc_id so creating.")
114 self.create(doc)
115 return None
117 if not force and doc.payload_equal(original):
118 msg = f"No changes detected for document {doc_id} in {col_name}. Skipping update."
119 LOG().debug(msg)
120 return None
122 result = self.client.collections[col_name].documents[doc_id].update(updates)
123 result_data = dict(result)
124 updated_id = result_data.get("id")
126 if updated_id is None:
127 LOG().warning(f"Failed to update document {doc_id} in {col_name}, no ID returned.")
128 return None
129 if str(updated_id) != str(doc_id):
130 LOG().warning(
131 f"Updated document ID mismatch for {doc_id} in {col_name}: "
132 f"expected {doc_id}, got {updated_id}.",
133 )
135 # LOG().debug(f"Updated document in {col_name} with ID: {updated_id} and updates: {updates}")
136 return self._model_class.from_payload(str(updated_id), updates, original)
138 #
139 # get
140 #
141 def get(self, doc_id: str) -> SModel | None:
142 col_name = self.collection.value
143 try:
144 document = self.client.collections[col_name].documents[doc_id].retrieve()
145 data = dict(document) # Convert Mapping to dict
146 return self._model_class.from_payload(doc_id, data)
148 except ObjectNotFound:
149 return None
151 #
152 # delete
153 #
154 def batch_delete(self, doc_ids: list[str]) -> None:
155 col_name = self.collection.value
156 try:
157 self.client.collections[col_name].documents.delete(
158 {"filter_by": f"id:[{','.join(doc_ids)}]"},
159 )
160 except Exception as error:
161 LOG().warning(f"Error batch deleting documents from {col_name}: {error}")
163 def delete(self, doc_id: str) -> SModel | None:
164 col_name = self.collection.value
165 try:
166 LOG().debug(f"Deleting document ID {doc_id} from {col_name}")
167 ref = self.client.collections[col_name].documents[doc_id]
168 doc = ref.retrieve()
169 data = dict(doc) # Convert Mapping to dict
170 model = self._model_class.from_payload(doc_id, data)
172 ref.delete()
173 return model
174 except Exception as error:
175 LOG().warning(f"Error deleting document {doc_id} from {col_name}: {error}")
176 return None
178 #
179 # search
180 #
181 def search(self, params: SearchParameters) -> TypesensePayload:
182 col_name = self.collection.value
183 if IS_DEBUG:
184 msg = f"Executing search on {col_name} with params: {stringify_debug(cast('Mapping[str, Any]', params))}"
185 LOG().debug(msg)
187 try:
188 collection = self.client.collections[col_name]
189 results = collection.documents.search(params)
190 return TypesensePayload.from_result(cast("Mapping[str, Any]", results))
191 except Exception as error:
192 LOG().error(f"Search error on collection {col_name} query '{params}': {error}")
193 raise SearchError(source=col_name, message=f"Search error: {error!s}") from error
195 #
196 # misc
197 #
198 def process_result(self, op_str: str, result: TypesensePayload) -> list[SModel] | None:
199 if result.found <= 0:
200 msg = f"Could not find document for {op_str} in {self.collection.value}"
201 LOG().warning(msg)
202 return None
204 hits = result.hits
205 if len(hits) == 0:
206 msg = f"No documents found for {op_str} in {self.collection.value}"
207 LOG().warning(msg)
208 return None
210 return [self._model_class.from_payload(hit.doc_id, hit.document) for hit in hits]