Coverage for functions \ flipdare \ manager \ search_manager.py: 100%
0 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
14import flask
15import typesense
16from typesense.exceptions import ObjectNotFound, TypesenseClientError
17from flipdare.app_log import LOG
18from flipdare.constants import SEARCH_NAT_LANG_CONFIG
19from flipdare.core.singleton import Singleton
20from flipdare.error.app_error import AppError, ServerError
21from flipdare.error.error_context import ErrorContext
22from flipdare.generated.shared.app_error_code import AppErrorCode
23from flipdare.generated.shared.app_log_category import AppLogCategory
24from flipdare.generated.shared.search.search_collections import SearchCollections
25from flipdare.request.app_request import AppRequest
26from flipdare.request.request_types import AppHttpRequestType
27from flipdare.core.app_response import AppOkResponse
28from flipdare.search.db.app_friend_search import AppFriendSearch
29from flipdare.search.db.app_general_search import AppGeneralSearch
31__all__ = ["SearchManager"]
34class SearchManager(Singleton):
35 """
36 Primarily responsible for creation and management of search clients and collections.
37 Also provides helper methods for common search operations.
39 Maintenance Tasks:
41 Blue/Green Re-index Every 3 days (2:00 AM)
42 Ensures your index is 100% in sync with your primary
43 DB and fixes any minor drift.
45 Database Compaction Weekly (or after Re-index) 0 4 * * 0 (4:00 AM Sun)
46 Only needed if you have high write/delete volume.
47 Running it weekly is sufficient for most.
48 """
50 def __init__(
51 self,
52 client: typesense.Client | None = None,
53 general: AppGeneralSearch | None = None,
54 friend: AppFriendSearch | None = None,
55 ) -> None:
56 super().__init__()
58 self._client = client
59 self._general = general
60 self._friend = friend
62 @property
63 def client(self) -> typesense.Client:
64 if self._client is None:
65 self._client = self._create_typesense_client()
67 return self._client
69 @property
70 def general(self) -> AppGeneralSearch:
71 if self._general is None:
72 self._general = AppGeneralSearch(self.client)
74 return self._general
76 @property
77 def friend(self) -> AppFriendSearch:
78 if self._friend is None:
79 self._friend = AppFriendSearch(self.client)
81 return self._friend
83 @staticmethod
84 def _create_typesense_client() -> typesense.Client:
86 from flipdare.app_config import get_app_config
88 config = get_app_config()
89 hostname = config.search_ip
90 port = config.search_port
91 api_key = config.search_api_key
92 timeout = config.search_timeout
94 LOG().error(
95 f"Creating Typesense client with settings: {hostname}:{port}:{api_key}",
96 include_stack=True,
97 )
99 return typesense.Client(
100 {
101 "nodes": [{"host": hostname, "port": port, "protocol": "http"}],
102 "api_key": api_key,
103 "connection_timeout_seconds": timeout,
104 },
105 )
107 # ---------------------------------------------------------------------------------------------
108 # ADMIN
109 # ---------------------------------------------------------------------------------------------
111 def initialize_all(self) -> None:
112 client = self.client
113 collections = client.collections.retrieve()
114 existing_collections = {col["name"] for col in collections}
116 try:
117 for collection in SearchCollections:
118 if collection.base_name not in existing_collections:
119 LOG().info(f"Creating missing collection: {collection.base_name}")
120 self.create_collection_if(collection)
121 self.create_alias_if(collection.value, collection.base_name)
122 except Exception as e:
123 msg = f"Failed to initialize search collections: {e}"
124 raise ServerError(
125 message=msg,
126 error_code=AppErrorCode.SERVER_INIT,
127 error=e,
128 ) from e
130 self.initialize_nat_lang()
132 def initialize_nat_lang(self) -> None:
134 from flipdare.app_config import get_app_config
136 app_config = get_app_config()
137 enable_nat_lang = app_config.search_enable_nat_lang
138 if not enable_nat_lang:
139 LOG().info("Natural language search is disabled.")
140 return
142 LOG().info("Enabling natural language search settings...")
143 api_key = app_config.gemini_api_key
144 model_config = SEARCH_NAT_LANG_CONFIG.copy()
145 model_config["api_key"] = api_key
147 client = self.client
148 try:
149 client.nl_search_models.create(model_config) # type: ignore
150 LOG().info(f"Model {model_config['id']} configured successfully.")
151 except TypesenseClientError as e:
152 LOG().error(f"Error configuring model: {e}")
153 except Exception as e:
154 LOG().error(f"Unexpected error: {e}")
156 def create_collection_if(self, collection: SearchCollections) -> bool:
157 if self.collection_exists(collection):
158 LOG().info(f"Skipped existing collection: {collection.base_name}")
159 return False
161 LOG().info(f"Creating collection: {collection.base_name}")
162 col_def = collection.definition
163 self.client.collections.create(col_def) # type: ignore
164 return True
166 def collection_exists(self, collection: SearchCollections) -> bool:
167 client = self.client
168 col_name = collection.base_name
169 try:
170 collections = client.collections.retrieve()
171 return any(col["name"] == col_name for col in collections)
172 except Exception as error:
173 LOG().error(f"Error checking collection existence for {col_name}: {error}")
174 return False
176 def create_alias_if(self, alias_name: str, collection_name: str) -> bool:
177 if self.alias_exists(alias_name, expected_destination=collection_name):
178 LOG().info(f"Skipped existing alias {alias_name} -> {collection_name}.")
179 return False
181 LOG().info(f"Creating alias {alias_name} -> {collection_name}.")
182 self.client.aliases.upsert(alias_name, {"collection_name": collection_name})
183 return True
185 def alias_exists(self, alias_name: str, expected_destination: str | None = None) -> bool:
186 try:
187 # Direct lookup is O(1) instead of O(n)
188 alias_detail = self.client.aliases[alias_name].retrieve()
190 if expected_destination is not None:
191 return alias_detail["collection_name"] == expected_destination
193 return True
194 except ObjectNotFound:
195 return False
196 except Exception as error:
197 LOG().error(f"Unexpected error checking alias {alias_name}: {error}")
198 return False
200 # ---------------------------------------------------------------------------------------------
201 # MAINTENANCE
202 # ---------------------------------------------------------------------------------------------
204 def dump(self) -> None:
205 """
206 Dumps the configuration to log. Useful for debugging connection issues.
207 """
208 client = self.client
209 try:
210 collections = client.collections.retrieve()
211 aliases = client.aliases.retrieve()
212 LOG().info(f"Current Typesense Collections: {[col['name'] for col in collections]}")
213 LOG().info(
214 f"Current Typesense Aliases: {[alias['name'] for alias in aliases['aliases']]}",
215 )
216 except Exception as e:
217 LOG().error(f"Error dumping Typesense configuration: {e}")
219 def reindex(self) -> None:
220 # FIXME: FLP-629 - implement blue/green reindexing strategy
221 # NOTE: Just want to follow up for anyone else that might run into this.
222 # NOTE: This has been running for a few days now and the disk size and re-indexing
223 # NOTE: is remaining consistent.
224 # NOTE: Making sure the db-compaction-interval and snapshot-interval-seconds
225 # NOTE: flags were not overlapping between pipeline runs was the key here!
226 # Ref: https://github.com/typesense/typesense/issues/2005
228 # for Typesense to minimize downtime during index updates. This should include:
229 # Process:
230 # 1. Create new collection with updated schema (Green)
231 # 2. Import data into Green collection in background
232 # 3. Atomically swap alias to point from old collection (Blue) to new collection (Green)
233 # 4. In a blue/green re-index, you should perform compaction at the very end of the process,
234 # specifically after you have deleted the old "blue" collection.
235 ## 1. Import (Staggered)
236 # client.collections[NEW_NAME].documents.import_(data, {'batch_size': 100})
237 #
238 ## 2. Swap Alias (Instant)
239 # client.aliases.upsert('products', {'collection_name': NEW_NAME})
240 #
241 ## 3. Wait (Safety)
242 # import time
243 # time.sleep(300) # 5 mins
244 #
245 ## 4. Delete Old
246 # client.collections[OLD_NAME].delete()
247 #
248 ## 5. Compact (The only cleanup you need)
249 # client.operations.perform('db/compact')
250 raise NotImplementedError("Blue/Green re-indexing strategy is not yet implemented.")
252 def compact(self) -> None:
253 """
254 Compacts the Typesense search index to optimize performance.
255 Should be run periodically as part of maintenance.
256 """
257 try:
258 self.client.operations.perform("db/compact")
259 except Exception as e:
260 msg = f"Failed to compact Typesense index: {e}"
261 raise ServerError(
262 message=msg,
263 error_code=AppErrorCode.SERVER_MAINTENANCE,
264 error=e,
265 ) from e
267 # ---------------------------------------------------------------------------------------------
268 # HEALTH CHECK
269 # ---------------------------------------------------------------------------------------------
271 def ping_search(self, req: flask.Request) -> flask.Response: # pragma: no cover
273 from flipdare.services import get_search_manager
274 from flipdare.services import get_admin_mailer
275 from flipdare.core.app_response import AppErrorResponse
277 # internal function, no coverage
278 result = AppRequest.http(req, AppHttpRequestType.PING_SEARCH)
279 mailer = get_admin_mailer()
281 try:
282 result.is_authenticated()
283 except AppError as error:
284 msg = f"Not Authenticated?:\n{error!s}\n"
285 mailer.send_error(
286 error_code=AppErrorCode.SERVER,
287 category=AppLogCategory.COMMAND,
288 message=msg,
289 include_stack=True,
290 )
291 return AppErrorResponse.from_context(
292 ctx=ErrorContext.unauthorized(req.url, message=msg),
293 ).raw_response()
295 try:
296 manager = get_search_manager()
297 self._check_categories(manager)
298 except Exception as ex:
299 msg = f"Search offline..: {ex}"
300 mailer.send_error(
301 error_code=AppErrorCode.SERVER,
302 category=AppLogCategory.COMMAND,
303 message=msg,
304 include_stack=True,
305 )
306 return AppErrorResponse.from_context(
307 ctx=ErrorContext.server_error(req.url, message=msg),
308 ).raw_response()
310 return AppOkResponse.ok().raw_response()
312 def _check_categories(self, manager: SearchManager) -> None:
313 search = manager.general
314 ok = search.client.operations.is_healthy()
315 if not ok:
316 msg = f"Search is unhealthy: {ok}"
317 raise AppError(AppErrorCode.SERVER, msg)