Coverage for functions \ flipdare \ search \ db \ _app_search.py: 72%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from abc import ABC, abstractmethod 

15from collections.abc import Mapping 

16from enum import Enum 

17from typing import Any, cast 

18import typesense 

19from typesense.exceptions import ObjectNotFound 

20from typesense.types.document import SearchParameters 

21 

22from flipdare.app_log import LOG 

23from flipdare.constants import IS_DEBUG, MAX_SEARCH_RESULTS_PER_PAGE 

24from flipdare.error.app_error import SearchError 

25from flipdare.generated.shared.search.search_collections import SearchCollections 

26from flipdare.search.doc._search_document import SearchDocument 

27from flipdare.search.result.typesense_payload import TypesensePayload 

28from flipdare.util.debug_util import stringify_debug 

29 

30__all__ = ["AppSearch"] 

31 

32 

33class AppSearch[SModel: SearchDocument[Any], T: Enum](ABC): 

34 

35 def __init__( 

36 self, 

37 collection: SearchCollections, 

38 client: typesense.Client, 

39 model_class: type[SModel], 

40 per_page: int = MAX_SEARCH_RESULTS_PER_PAGE, 

41 ) -> None: 

42 self._collection = collection 

43 self._client = client 

44 self._model_class = model_class 

45 self._per_page = per_page 

46 

47 @abstractmethod 

48 def get_user(self, uid: str) -> list[SModel] | None: ... 

49 

50 @abstractmethod 

51 def delete_user(self, uid: str) -> None: ... 

52 

53 @abstractmethod 

54 def get_user_type(self, uid: str, identifier: str) -> SModel | None: ... 

55 

56 @abstractmethod 

57 def delete_user_type(self, uid: str, identifier: str, **kwargs: Any) -> SModel | None: ... 

58 

59 @property 

60 def collection(self) -> SearchCollections: 

61 return self._collection 

62 

63 @property 

64 def client(self) -> typesense.Client: 

65 return self._client 

66 

67 def count(self) -> int: 

68 col_name = self.collection.value 

69 try: 

70 collection_info = self.client.collections[col_name].retrieve() 

71 document_count = collection_info["num_documents"] 

72 return int(document_count) 

73 except Exception as error: 

74 LOG().error(f"Error counting documents in {col_name}: {error}") 

75 return 0 

76 

77 def create(self, document: SModel) -> SModel: 

78 col_name = self.collection.value 

79 payload = document.payload() 

80 doc_id: str | None = None 

81 

82 try: 

83 result = self.client.collections[col_name].documents.create(payload) 

84 result_data = dict(result) 

85 if "id" in result_data: 

86 doc_id = str(result_data["id"]) 

87 

88 except Exception as error: 

89 LOG().error(f"Error creating document in {col_name} with payload {payload}: {error}") 

90 raise 

91 

92 if doc_id is None: 

93 LOG().error(f"Failed to add document to {col_name}, no ID returned.") 

94 raise ValueError(f"Failed to add document to {col_name}, no ID returned.") 

95 

96 return self._model_class.from_payload(str(doc_id), payload) 

97 

98 def update(self, doc: SModel, force: bool = False) -> SModel | None: 

99 doc_id = doc.doc_id 

100 if doc_id is None: 

101 raise ValueError(f"Document must have a valid doc_id for update.: {doc} ") 

102 

103 col_name = self.collection.value 

104 updates = doc.updates() 

105 if updates is None: 

106 LOG().warning(f"No updates provided for document {doc_id} in {col_name}.") 

107 return None 

108 

109 original = self.get(doc_id) 

110 if original is None: 

111 # this should not happen (since we have a doc_id), but just in case 

112 # warn but continue 

113 LOG().warning(f"Document {doc_id} not found in {col_name}, has doc_id so creating.") 

114 self.create(doc) 

115 return None 

116 

117 if not force and doc.payload_equal(original): 

118 msg = f"No changes detected for document {doc_id} in {col_name}. Skipping update." 

119 LOG().debug(msg) 

120 return None 

121 

122 result = self.client.collections[col_name].documents[doc_id].update(updates) 

123 result_data = dict(result) 

124 updated_id = result_data.get("id") 

125 

126 if updated_id is None: 

127 LOG().warning(f"Failed to update document {doc_id} in {col_name}, no ID returned.") 

128 return None 

129 if str(updated_id) != str(doc_id): 

130 LOG().warning( 

131 f"Updated document ID mismatch for {doc_id} in {col_name}: " 

132 f"expected {doc_id}, got {updated_id}.", 

133 ) 

134 

135 # LOG().debug(f"Updated document in {col_name} with ID: {updated_id} and updates: {updates}") 

136 return self._model_class.from_payload(str(updated_id), updates, original) 

137 

138 # 

139 # get 

140 # 

141 def get(self, doc_id: str) -> SModel | None: 

142 col_name = self.collection.value 

143 try: 

144 document = self.client.collections[col_name].documents[doc_id].retrieve() 

145 data = dict(document) # Convert Mapping to dict 

146 return self._model_class.from_payload(doc_id, data) 

147 

148 except ObjectNotFound: 

149 return None 

150 

151 # 

152 # delete 

153 # 

154 def batch_delete(self, doc_ids: list[str]) -> None: 

155 col_name = self.collection.value 

156 try: 

157 self.client.collections[col_name].documents.delete( 

158 {"filter_by": f"id:[{','.join(doc_ids)}]"}, 

159 ) 

160 except Exception as error: 

161 LOG().warning(f"Error batch deleting documents from {col_name}: {error}") 

162 

163 def delete(self, doc_id: str) -> SModel | None: 

164 col_name = self.collection.value 

165 try: 

166 LOG().debug(f"Deleting document ID {doc_id} from {col_name}") 

167 ref = self.client.collections[col_name].documents[doc_id] 

168 doc = ref.retrieve() 

169 data = dict(doc) # Convert Mapping to dict 

170 model = self._model_class.from_payload(doc_id, data) 

171 

172 ref.delete() 

173 return model 

174 except Exception as error: 

175 LOG().warning(f"Error deleting document {doc_id} from {col_name}: {error}") 

176 return None 

177 

178 # 

179 # search 

180 # 

181 def search(self, params: SearchParameters) -> TypesensePayload: 

182 col_name = self.collection.value 

183 if IS_DEBUG: 

184 msg = f"Executing search on {col_name} with params: {stringify_debug(cast('Mapping[str, Any]', params))}" 

185 LOG().debug(msg) 

186 

187 try: 

188 collection = self.client.collections[col_name] 

189 results = collection.documents.search(params) 

190 return TypesensePayload.from_result(cast("Mapping[str, Any]", results)) 

191 except Exception as error: 

192 LOG().error(f"Search error on collection {col_name} query '{params}': {error}") 

193 raise SearchError(source=col_name, message=f"Search error: {error!s}") from error 

194 

195 # 

196 # misc 

197 # 

198 def process_result(self, op_str: str, result: TypesensePayload) -> list[SModel] | None: 

199 if result.found <= 0: 

200 msg = f"Could not find document for {op_str} in {self.collection.value}" 

201 LOG().warning(msg) 

202 return None 

203 

204 hits = result.hits 

205 if len(hits) == 0: 

206 msg = f"No documents found for {op_str} in {self.collection.value}" 

207 LOG().warning(msg) 

208 return None 

209 

210 return [self._model_class.from_payload(hit.doc_id, hit.document) for hit in hits]