Coverage for functions \ flipdare \ search \ result \ typesense_payload.py: 81%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14import re 

15from dataclasses import dataclass 

16from typing import Self, override 

17 

18from pydantic import BaseModel 

19 

20from flipdare.app_log import LOG 

21from flipdare.app_types import JsonDict, TypesenseDict 

22from flipdare.generated.model.search.result_hint_model import ResultHintModel 

23from flipdare.search.doc.general_document import GeneralDocument 

24from flipdare.search.result.typesense_model_loader import TypesenseModelLoader 

25from flipdare.search.result.typesense_models import ( 

26 HighlightGuards, 

27 HighlightType, 

28 TArrayHighlightModel, 

29 TResultModel, 

30 TStringHighlightModel, 

31) 

32 

33 

34@dataclass(frozen=True, slots=True) 

35class HitResult: # type: ignore[misc] 

36 doc_id: str 

37 document: JsonDict 

38 highlights: list[HighlightType] | None = None 

39 

40 

41class THintValue(BaseModel): 

42 start: int 

43 before: str 

44 snippet: str 

45 after: str 

46 

47 

48class THint(BaseModel): 

49 indices: list[int] 

50 hints: list[THintValue] 

51 

52 

53class TypesensePayload: 

54 __slots__ = ("_model",) 

55 _model: TResultModel 

56 

57 def __init__( 

58 self, 

59 model: TResultModel, 

60 ) -> None: 

61 self._model = model 

62 

63 @classmethod 

64 def from_result(cls, result: TypesenseDict) -> Self: 

65 model = TypesenseModelLoader(result).load() 

66 return cls(model) 

67 

68 @property 

69 def query(self) -> str: 

70 return self._model.request_params.q 

71 

72 @property 

73 def found(self) -> int: 

74 return self._model.found 

75 

76 @property 

77 def page(self) -> int: 

78 return self._model.page 

79 

80 @property 

81 def out_of(self) -> int: 

82 return self._model.out_of 

83 

84 @property 

85 def collection_name(self) -> str: 

86 return self._model.request_params.collection_name 

87 

88 @property 

89 def hits(self) -> list[HitResult]: 

90 results = [] 

91 model_hits = self._model.hits 

92 for model_hit in model_hits: 

93 hit_result = HitResult( 

94 doc_id=model_hit.document.get("id", ""), 

95 document=model_hit.document, 

96 highlights=model_hit.highlights, 

97 ) 

98 results.append(hit_result) 

99 return results 

100 

101 def general_docs(self) -> list[GeneralDocument]: 

102 # Parse hits WITHOUT timestamp conversion for internal document objects 

103 hits = self._model.hits 

104 general_docs: list[GeneralDocument] = [] 

105 for hit in hits: 

106 raw_doc = hit.document 

107 if len(raw_doc) == 0: 

108 continue 

109 

110 doc_id = raw_doc.get("id", None) 

111 if doc_id is None: 

112 LOG().debug(f"Document without ID found in search results: {raw_doc}") 

113 continue 

114 

115 LOG().debug(f"Processing document from search results: {doc_id}") 

116 try: 

117 # Use raw document data directly without timestamp conversion 

118 document = GeneralDocument.from_payload(doc_id, raw_doc) 

119 if document.doc_id is not None: 

120 general_docs.append(document) 

121 continue 

122 # technically, this should never happen because GeneralDocument.from_payload should throw. 

123 msg = f"Failed to convert document ID {doc_id} to GeneralDocument: {raw_doc}" 

124 LOG().error(msg) 

125 continue 

126 except Exception as e: 

127 LOG().warning(f"Error converting document ID {doc_id} to search document: {e}") 

128 continue 

129 

130 return general_docs 

131 

132 def hints(self) -> list[ResultHintModel]: 

133 first = self._model.hits[0] if self._model.hits else None 

134 if first is None: 

135 return [] 

136 

137 highlights = first.highlights 

138 if highlights is None or len(highlights) == 0: 

139 return [] 

140 

141 if HighlightGuards.is_array_list(highlights): 

142 return self._build_array_hints(highlights) 

143 elif HighlightGuards.is_string_list(highlights): 

144 return self._build_string_hint(highlights) 

145 else: 

146 LOG().warning(f"Unknown highlight type in search results: {highlights[0].kind}") 

147 return [] 

148 

149 def _build_array_hints(self, highlights: list[TArrayHighlightModel]) -> list[ResultHintModel]: 

150 hints: list[ResultHintModel] = [] 

151 

152 for highlight in highlights: 

153 indices = highlight.indices 

154 matched_tokens = highlight.matched_tokens 

155 snippets = highlight.snippets 

156 if len(indices) != len(matched_tokens) or len(indices) != len(snippets): 

157 LOG().warning( 

158 f"Highlight data length mismatch in hit highlights: " 

159 f"indices={indices}, matched_tokens={matched_tokens}, snippets={snippets}", 

160 ) 

161 continue 

162 

163 hint = self._build_hint(snippets[0], matched_tokens[0][0], indices[0]) 

164 if hint is not None: 

165 hints.append(hint) 

166 return hints 

167 

168 def _build_string_hint(self, highlights: list[TStringHighlightModel]) -> list[ResultHintModel]: 

169 hints: list[ResultHintModel] = [] 

170 

171 for highlight in highlights: 

172 hint = self._build_raw_hint(highlight.snippet) 

173 if hint is not None: 

174 hints.append(hint) 

175 return hints 

176 

177 @staticmethod 

178 def _build_hint(snippet: str, matched: str, start_tag_idx: int) -> ResultHintModel | None: 

179 if "<mark>" not in snippet or "</mark>" not in snippet: 

180 return None 

181 

182 # 2. Get the "original" by removing ONLY the tags 

183 original = re.sub(r"</?mark>", "", snippet) 

184 

185 # 3. Calculate the clean end index 

186 # (The length of the matched word starting from the original start position) 

187 start_idx = start_tag_idx 

188 end_idx = start_idx + len(matched) 

189 

190 return ResultHintModel(start=start_idx, end=end_idx, matched=matched, text=original) 

191 

192 @staticmethod 

193 def _build_raw_hint(snippet: str) -> ResultHintModel | None: 

194 if "<mark>" not in snippet or "</mark>" not in snippet: 

195 return None 

196 

197 match = re.search(r"<mark>(.*?)</mark>", snippet) 

198 if not match: 

199 return None 

200 

201 matched = match.group(1) # "searchable" 

202 start_tag_idx = match.start() # 0 

203 # end_tag_idx = match.end() # 22 (position after </mark>) 

204 

205 # 2. Get the "original" by removing ONLY the tags 

206 original = re.sub(r"</?mark>", "", snippet) 

207 

208 # 3. Calculate the clean end index 

209 # (The length of the matched word starting from the original start position) 

210 start_idx = start_tag_idx 

211 end_idx = start_idx + len(matched) 

212 

213 return ResultHintModel(start=start_idx, end=end_idx, matched=matched, text=original) 

214 

215 @override 

216 def __str__(self) -> str: 

217 return ( 

218 f"TypesensePayload(query={self.query}, collection_name={self.collection_name}, " 

219 f"found={self.found}, page={self.page}, out_of={self.out_of}, " 

220 f"hits_count={len(self.hits)})" 

221 ) 

222 

223 @override 

224 def __repr__(self) -> str: 

225 return self.__str__()