Coverage for functions \ flipdare \ backend \ indexer_service.py: 39%

308 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from dataclasses import dataclass 

15from enum import Enum 

16from typing import Any 

17 

18from flipdare.app_log import LOG 

19from flipdare.constants import IS_DEBUG, MAX_SEARCH_TAG_COUNT, NO_DOC_ID 

20from flipdare.result.app_result import AppResult 

21from flipdare.service._service_provider import ServiceProvider 

22from flipdare.core.tokenizer import Tokenizer 

23from flipdare.firestore import DareContextFactory, FriendContext, GroupContextFactory 

24from flipdare.generated import AppErrorCode 

25from flipdare.generated.shared.search.search_obj_type import SearchObjType 

26from flipdare.search import ( 

27 ContentSearchFactory, 

28 DareSearchFactory, 

29 FriendSearchFactory, 

30 GeneralDocument, 

31 GroupMemberSearchFactory, 

32 GroupSearchFactory, 

33 SearchDocument, 

34 SearchDocumentFactory, 

35 UserSearchFactory, 

36) 

37from flipdare.search.doc.friend_document import FriendDocument 

38from flipdare.wrapper import ( 

39 ContentWrapper, 

40 DareWrapper, 

41 FriendWrapper, 

42 GroupMemberWrapper, 

43 GroupWrapper, 

44 UserWrapper, 

45) 

46 

47__all__ = ["IndexerService"] 

48 

49 

50type IndexedDocType = GeneralDocument | FriendDocument 

51 

52 

53class IndexerType(Enum): 

54 NEW = "new" 

55 UPDATE = "update" 

56 DELETE = "delete" 

57 

58 

59@dataclass 

60class _GeneralOpData: 

61 uid: str 

62 obj_id: str 

63 

64 

65@dataclass 

66class _FriendOpData: 

67 uid: str 

68 friend_uid: str 

69 

70 

71type _OpData = _GeneralOpData | _FriendOpData 

72 

73 

74class IndexerService(ServiceProvider): 

75 

76 def __init__( 

77 self, 

78 tokenizer: Tokenizer | None = None, 

79 max_tags: int = MAX_SEARCH_TAG_COUNT, 

80 ) -> None: 

81 super().__init__() 

82 

83 self._tokenizer = tokenizer 

84 self._max_tags = max_tags 

85 

86 @property 

87 def tokenizer(self) -> Tokenizer: 

88 if self._tokenizer is None: 

89 self._tokenizer = Tokenizer.instance() 

90 return self._tokenizer 

91 

92 @property 

93 def max_tags(self) -> int: 

94 return self._max_tags 

95 

96 def delete_friend(self, friend: FriendWrapper) -> AppResult: 

97 return self._delete(_FriendOpData(friend.to_uid, friend.from_uid)) 

98 

99 def delete_all_friends(self, uid: str) -> AppResult: 

100 # this will delete all friend documents for the user, which includes both from and to friends 

101 return self._delete(_FriendOpData(uid, uid)) 

102 

103 def delete_general(self, uid: str, obj_id: str) -> AppResult: 

104 return self._delete(_GeneralOpData(uid, obj_id)) 

105 

106 def process_content( 

107 self, 

108 content: ContentWrapper, 

109 is_update: bool, 

110 ) -> AppResult[ContentWrapper]: 

111 doc_id = content.doc_id 

112 content_result = AppResult[ContentWrapper](doc_id=doc_id) 

113 

114 content_id = content.doc_id 

115 if not content.is_user and not content.is_group: 

116 cause = "ContentWrapper objType is not USER or GROUP, cannot add to search." 

117 content_result.add_error(AppErrorCode.INVALID_DATA, cause) 

118 return content_result 

119 if content.description is None: 

120 msg = f"Content {content_id} has no description, skipping indexing." 

121 return AppResult[ContentWrapper].skip(content_id, message=msg) 

122 description = content.description 

123 assert description # narrowing 

124 

125 search_document: SearchDocumentFactory | None = None 

126 if content.is_user: 

127 user_result = self.user_bridge.get(content.obj_id) 

128 if user_result.is_error: 

129 msg = f"Unable to retrieve user {content.obj_id} for content {content_id}" 

130 content_result.add_error(AppErrorCode.DATABASE_EX, msg) 

131 return content_result 

132 

133 user_model = user_result.generated 

134 assert user_model # narrowing 

135 

136 user_id = user_model.doc_id 

137 if user_model.model.can_share: 

138 search_document = ContentSearchFactory( 

139 content, 

140 description, 

141 user_model, 

142 self.tokenizer, 

143 ) 

144 else: 

145 if not is_update: 

146 LOG().debug(f"User {user_id} is not searchable, skipping.") 

147 content_result.merge(user_result) 

148 return content_result 

149 

150 # Delete existing search entries if the user is no longer searchable 

151 LOG().debug(f"User {user_id} is no longer searchable, deleting from search.") 

152 delete_result = self._delete(_GeneralOpData(content_id, SearchObjType.USER)) 

153 content_result.merge(user_result) 

154 content_result.merge(delete_result) 

155 return content_result 

156 else: 

157 group_result = self.group_bridge.get(content.obj_id) 

158 if group_result.is_error: 

159 msg = f"Unable to retrieve group {content.obj_id} for content {content_id}" 

160 content_result.add_error(AppErrorCode.DATABASE_EX, msg) 

161 return content_result 

162 

163 group_model = group_result.generated 

164 if group_model is None: 

165 msg = f"No group found for content {content_id} with group id {content.obj_id}" 

166 content_result.add_error(AppErrorCode.NOT_FOUND, msg) 

167 return content_result 

168 

169 search_document = ContentSearchFactory( 

170 content, 

171 description, 

172 group_model, 

173 self.tokenizer, 

174 ) 

175 

176 try: 

177 result = self._add(search_document, is_update=is_update) 

178 if not result.is_error: 

179 return content_result 

180 

181 content_result.merge(result) 

182 return content_result 

183 except Exception as error: 

184 msg = f"Error building searchable user for user {content_id}: {error}" 

185 content_result.add_error(AppErrorCode.SERVER_EX, msg) 

186 return content_result 

187 

188 def process_friend( 

189 self, 

190 friend_context: FriendContext, 

191 updated: bool, 

192 ) -> AppResult[FriendWrapper]: 

193 

194 doc_id = friend_context.doc_id 

195 friend_id = friend_context.friend_id 

196 

197 friend_result: AppResult[FriendWrapper] = AppResult( 

198 doc_id=doc_id, 

199 task_name=f" for friend {friend_id}", 

200 ) 

201 

202 if IS_DEBUG: 

203 LOG().debug(f"Processing friend {friend_id} for search.") 

204 

205 try: 

206 model_tags = FriendSearchFactory(friend_context, self.tokenizer) 

207 result = self._add(model_tags, updated) 

208 if not result.is_error: 

209 return friend_result 

210 

211 friend_result.merge(result) 

212 except Exception as error: 

213 msg = f"Error building searchable friend for friend {friend_id}: {error}" 

214 friend_result.add_error(AppErrorCode.SERVER_EX, msg) 

215 

216 return friend_result 

217 

218 def process_user(self, user: UserWrapper, is_update: bool) -> AppResult[UserWrapper]: 

219 user_id = user.doc_id 

220 user_result: AppResult[UserWrapper] = AppResult( 

221 doc_id=user_id, 

222 task_name=f" for user {user_id}", 

223 ) 

224 

225 if not user.model.can_share: 

226 if not is_update: 

227 LOG().debug(f"User {user_id} is not searchable, skipping.") 

228 return user_result 

229 

230 # Delete existing search entries if the user is no longer searchable 

231 LOG().debug(f"User {user_id} is no longer searchable, deleting from search.") 

232 delete_result = self._delete(_GeneralOpData(user_id, SearchObjType.USER)) 

233 user_result.merge(delete_result) 

234 return user_result 

235 

236 try: 

237 model_tags = UserSearchFactory(user, self.tokenizer) 

238 result = self._add(model_tags, is_update=is_update) 

239 if not result.is_error: 

240 return user_result 

241 

242 user_result.merge(result) 

243 return user_result 

244 except Exception as error: 

245 msg = f"Error building searchable user for user {user_id}: {error}" 

246 user_result.add_error(AppErrorCode.SERVER_EX, msg) 

247 return user_result 

248 

249 def process_dare(self, dare_model: DareWrapper, is_update: bool) -> AppResult[DareWrapper]: 

250 dare_id = dare_model.doc_id 

251 dare_result: AppResult[DareWrapper] = AppResult( 

252 doc_id=dare_id, 

253 task_name=f" for dare {dare_id}", 

254 ) 

255 

256 obj_type = SearchObjType.GROUP_DARE if dare_model.is_group_dare else SearchObjType.DARE 

257 dare_context = DareContextFactory().create(dare_model) 

258 if dare_context is None: 

259 msg = f"Unable to create dare context for dare id: {dare_id}" 

260 dare_result.add_error(AppErrorCode.INVALID_DATA, msg) 

261 return dare_result 

262 

263 if not dare_context.valid: 

264 msg = f"Invalid dare context for dare id: {dare_id}" 

265 msg += f"\n{dare_context.error_str}" 

266 

267 dare_result.add_error(AppErrorCode.INVALID_DATA, msg) 

268 return dare_result 

269 

270 dare = dare_context.dare 

271 if not dare.can_share: 

272 if not is_update: 

273 LOG().info(f"Dare {dare_id} is not searchable, skipping.") 

274 return dare_result 

275 

276 # Delete existing search entries if the dare is no longer searchable 

277 LOG().info(f"Dare {dare_id} is no longer searchable, deleting from search.") 

278 delete_result = self._delete(_GeneralOpData(dare_id, obj_type)) 

279 dare_result.merge(delete_result) 

280 return dare_result 

281 

282 try: 

283 model_tags = DareSearchFactory(dare_context, self.tokenizer) 

284 result = self._add(model_tags, is_update) 

285 if result.is_error: 

286 dare_result.merge(result) 

287 return dare_result 

288 except Exception as error: 

289 msg = f"Error building searchable dare for dare {dare_id}: {error}\n" 

290 msg += f"Context:\n{dare_context.error_str}" 

291 dare_result.add_error(AppErrorCode.TAGGING, msg) 

292 return dare_result 

293 

294 def process_group( 

295 self, 

296 group_model: GroupWrapper, 

297 is_update: bool, 

298 ) -> AppResult[GroupWrapper]: 

299 group_id = group_model.doc_id 

300 group_result: AppResult[GroupWrapper] = AppResult( 

301 doc_id=group_id, 

302 task_name=f" for group {group_id}", 

303 ) 

304 

305 try: 

306 group_context = GroupContextFactory().create(group_model) 

307 if group_context is None or not group_context.valid: 

308 msg = f"Unable to create valid group context for group id: {group_id}" 

309 msg += f"\n{group_context.error_str}" if group_context else "" 

310 group_result.add_error(AppErrorCode.INVALID_DATA, msg) 

311 return group_result 

312 

313 model_tags = GroupSearchFactory(group_context, self.tokenizer) 

314 result = self._add(model_tags, is_update) 

315 if result.is_error: 

316 group_result.merge(result) 

317 return group_result 

318 

319 members: list[GroupMemberWrapper] | None = None 

320 try: 

321 members = group_context.members() 

322 if members is None: 

323 # could just be a new group 

324 return group_result 

325 except Exception as error: 

326 msg = f"Error retrieving members for group {group_id}: {error}" 

327 group_result.add_error(AppErrorCode.SERVER_EX, msg) 

328 return group_result 

329 

330 for i in range(len(members)): 

331 if i >= self._max_tags: 

332 LOG().warning( 

333 f"Group {group_id} has more than {self._max_tags} members, " 

334 f"skipping remaining members for search tagging.", 

335 ) 

336 break 

337 try: 

338 member_tags = GroupMemberSearchFactory(group_context, i, self.tokenizer) 

339 result = self._add(member_tags, is_update=is_update) 

340 if result.is_error: 

341 group_result.merge(result) 

342 except Exception as error: 

343 msg = f"Error creating GroupMemberModelTag for member index {i} in group {group_id}: {error}" 

344 LOG().error(msg) 

345 continue 

346 return group_result 

347 except Exception as error: 

348 group_result.add_error( 

349 AppErrorCode.SERVER_EX, 

350 f"Error building group context for group data: {error}", 

351 ) 

352 return group_result 

353 

354 def _add(self, doc_factory: SearchDocumentFactory, is_update: bool) -> AppResult: 

355 all_docs: list[SearchDocument[Any]] | None = doc_factory.get_documents() 

356 result: AppResult = AppResult(task_name=f"for obj {doc_factory.obj_type}") 

357 

358 if all_docs is None: 

359 LOG().debug("No search documents generated, skipping.") 

360 return result 

361 

362 if IS_DEBUG: 

363 LOG().debug(f"Adding {len(all_docs)} documents to search (is_update={is_update}).") 

364 

365 for doc in all_docs: 

366 debug_str = doc.debug_str 

367 

368 if is_update: 

369 # not we put the larger code at the start, because we may need to create ... 

370 try: 

371 if isinstance(doc, (GeneralDocument, FriendDocument)): 

372 changed = self._has_changed(doc.uid, doc) 

373 else: 

374 result.add_error( 

375 AppErrorCode.INVALID_DATA, 

376 f"Unsupported document type: {type(doc)}", 

377 ) 

378 continue 

379 

380 if changed is None: 

381 msg = f"No existing document found for {debug_str}, creating new document." 

382 LOG().info(msg) 

383 else: 

384 _, has_changed = changed 

385 if has_changed: 

386 self._update(doc) 

387 except Exception as error: 

388 msg = f"Error checking for changes and updating document for {debug_str}: {error}" 

389 LOG().error(msg) 

390 result.add_error(AppErrorCode.SERVER_EX, msg) 

391 

392 try: 

393 self._create(doc) 

394 except Exception as error: 

395 result.add_error( 

396 AppErrorCode.SERVER_EX, 

397 f"Error adding new document for {debug_str}: {error}", 

398 ) 

399 

400 return result 

401 

402 def _create(self, doc: SearchDocument[Any]) -> AppResult: 

403 result: AppResult = AppResult( 

404 doc_id=doc.doc_id or NO_DOC_ID, 

405 task_name=f"for user {doc.uid} and obj {doc.obj_type}", 

406 ) 

407 try: 

408 if isinstance(doc, GeneralDocument): 

409 self.search_manager.general.create(doc) 

410 elif isinstance(doc, FriendDocument): 

411 self.search_manager.friend.create(doc) 

412 else: 

413 result.add_error( 

414 AppErrorCode.INVALID_DATA, 

415 f"Unsupported document type: {type(doc)}", 

416 ) 

417 return result 

418 except Exception as error: 

419 result.add_error( 

420 AppErrorCode.SERVER_EX, 

421 f"Error adding new document for {doc.debug_str}: {error}", 

422 ) 

423 

424 return result 

425 

426 def _update(self, doc: SearchDocument[Any]) -> AppResult: 

427 result: AppResult = AppResult( 

428 doc_id=doc.doc_id or NO_DOC_ID, 

429 task_name=f"for user {doc.uid} and obj {doc.obj_type}", 

430 ) 

431 try: 

432 if isinstance(doc, GeneralDocument): 

433 self.search_manager.general.update(doc) 

434 elif isinstance(doc, FriendDocument): 

435 self.search_manager.friend.update(doc) 

436 else: 

437 result.add_error( 

438 AppErrorCode.INVALID_DATA, 

439 f"Unsupported document type: {type(doc)}", 

440 ) 

441 return result 

442 except Exception as error: 

443 result.add_error( 

444 AppErrorCode.SERVER_EX, 

445 f"Error updating document for {doc.debug_str}: {error}", 

446 ) 

447 

448 return result 

449 

450 def _get(self, uid: str, doc: IndexedDocType) -> IndexedDocType | None: 

451 mgr = self.search_manager 

452 

453 match doc: 

454 case GeneralDocument(): 

455 general = mgr.general 

456 return general.get_user_type( 

457 uid=uid, 

458 identifier=doc.obj_id, 

459 ) 

460 case FriendDocument(): 

461 friend = mgr.friend 

462 return friend.get_user_type( 

463 uid=uid, 

464 identifier=doc.friend_uid, 

465 ) 

466 

467 def _has_changed(self, uid: str, doc: IndexedDocType) -> tuple[str, bool] | None: 

468 mgr = self.search_manager 

469 saved_doc: IndexedDocType | None 

470 

471 obj_id: str 

472 match doc: 

473 case GeneralDocument(): 

474 obj_id = doc.obj_id 

475 general = mgr.general 

476 saved_doc = general.get_user_type( 

477 uid=uid, 

478 identifier=doc.obj_id, 

479 ) 

480 case FriendDocument(): 

481 friend = mgr.friend 

482 obj_id = doc.friend_uid 

483 saved_doc = friend.get_user_type( 

484 uid=uid, 

485 identifier=doc.friend_uid, 

486 ) 

487 

488 if saved_doc is None: 

489 # no doc, has_changed = True, so we add 

490 return None 

491 

492 doc_id = saved_doc.doc_id 

493 assert doc_id is not None # narrowing 

494 

495 if saved_doc.payload_equal(doc): 

496 LOG().debug(f"Document for {doc_id}/{obj_id} unchanged.") 

497 return doc_id, False 

498 

499 return doc_id, True 

500 

501 def _delete(self, data: _OpData) -> AppResult: 

502 result: AppResult = AppResult(task_name=f"for user {data.uid}") 

503 mgr = self.search_manager 

504 

505 match data: 

506 case _GeneralOpData(): 

507 mgr.general.delete_user_type( 

508 uid=data.uid, 

509 identifier=data.obj_id, 

510 ) 

511 case _FriendOpData(): 

512 mgr.friend.delete_user_type( 

513 uid=data.uid, 

514 identifier=data.friend_uid, 

515 ) 

516 

517 return result