Coverage for functions \ flipdare \ app_config.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15import io 

16from pathlib import Path 

17from typing import IO, Any, TypeVar 

18from firebase_admin import remote_config 

19from firebase_admin.remote_config import ServerConfig 

20import flask 

21from firebase_functions.core import CloudEvent 

22from firebase_functions.remote_config_fn import ConfigUpdateData 

23from flipdare._cred import DEV_CRED, PROD_CRED 

24from flipdare.backend.avatar_loader import AvatarLoader 

25from flipdare.app_config_loader import AppConfigLoader 

26from flipdare.app_env import get_app_environment 

27from flipdare.app_globals import is_text_present 

28from flipdare.app_log import LOG 

29from flipdare.config_key import ConfigKey 

30from flipdare.constants import IS_TRACE 

31from flipdare.core.config_option import ConfigOption 

32from flipdare.core.singleton import Singleton 

33from flipdare.mailer.app_email_type import AppEmailType 

34from flipdare.error.app_error import AppError, ServerError 

35from flipdare.error.error_context import ErrorContext 

36from flipdare.generated.shared.app_error_code import AppErrorCode 

37from flipdare.generated.shared.app_log_category import AppLogCategory 

38from flipdare.job.job_config import JobConfig 

39from flipdare.request.app_request import AppRequest 

40from flipdare.request.request_types import AppHttpRequestType 

41from flipdare.core.app_response import AppOkResponse 

42 

43__all__ = [ 

44 "AppConfig", 

45 "get_app_config", 

46 "get_job_config", 

47 "get_avatar_config", 

48] 

49 

50 

51# NOTE: we need to keep this here to avoid circular imports 

52 

53T = TypeVar("T", int, str, bool, float) 

54 

55 

56def get_app_config() -> AppConfig: 

57 return AppConfig.instance() 

58 

59 

60def get_job_config() -> JobConfig: 

61 return get_app_config().job_config 

62 

63 

64def get_avatar_config() -> AvatarLoader: 

65 return get_app_config().avatar_config 

66 

67 

68class AppConfig(Singleton): 

69 __slots__ = ("_avatar_config", "_buffer", "_is_dev", "_job_config", "_overrides") 

70 

71 _is_dev: bool 

72 _buffer: dict[ConfigKey, Any] 

73 _overrides: dict[ConfigKey, Any] 

74 _avatar_config: AvatarLoader | None 

75 _job_config: JobConfig | None 

76 

77 def __init__( 

78 self, 

79 job_config: JobConfig | None = None, 

80 avatar_config: AvatarLoader | None = None, 

81 overrides: dict[ConfigKey, Any] | None = None, 

82 ) -> None: 

83 

84 super().__init__() 

85 in_cloud = get_app_environment().in_cloud 

86 if in_cloud: 

87 self._is_dev = get_app_environment().is_dev 

88 else: 

89 # manually load cloud env for local dev/testing 

90 AppConfigLoader.load(in_cloud) 

91 self._is_dev = True 

92 

93 self._buffer: dict[ConfigKey, Any] = { 

94 ConfigKey.CREDENTIAL: DEV_CRED if self._is_dev else PROD_CRED, 

95 } 

96 self._overrides = overrides or {} 

97 self._avatar_config = avatar_config 

98 self._job_config = job_config 

99 self.load(overrides=self._overrides) 

100 

101 @property 

102 def job_config(self) -> JobConfig: 

103 if self._job_config is None: 

104 self._job_config = JobConfig.instance() 

105 return self._job_config 

106 

107 @property 

108 def avatar_config(self) -> AvatarLoader: 

109 if self._avatar_config is None: 

110 self._avatar_config = AvatarLoader.instance() 

111 return self._avatar_config 

112 

113 def load(self, overrides: dict[ConfigKey, Any] | None = None) -> None: 

114 for key in ConfigKey: 

115 if not key.is_config: 

116 if IS_TRACE: 

117 LOG().trace(f"Skipping trigger config key during load: {key.value}") 

118 continue 

119 

120 if overrides and key in overrides: 

121 value = overrides[key] 

122 self._buffer[key] = value 

123 continue 

124 

125 opt = ConfigOption(key) 

126 self._buffer[key] = opt.value 

127 

128 LOG().info("AppConfig initialized...") 

129 

130 def should_initialize_firebase(self) -> bool: # pragma: no cover 

131 if get_app_environment().in_cloud: 

132 return True 

133 

134 # if you run integration tests, you need to initialize the app yourself 

135 return not self._is_dev 

136 

137 def validate(self) -> None: 

138 validator = AppConfigValidator(self) 

139 validator.validate() 

140 

141 async def remote_update(self, _: CloudEvent[ConfigUpdateData]) -> None: 

142 try: 

143 template = await remote_config.get_server_template() # type: ignore[no-untyped-call] 

144 config = template.evaluate() 

145 except Exception as e: 

146 LOG().error(f"Failed to fetch remote config template: {e}") 

147 return 

148 

149 for key in ConfigKey: 

150 raw_value = self._get_param(config, key.value) 

151 if raw_value is not None: 

152 self._apply_config(key, raw_value) 

153 

154 # ------------------------------------------------------------------------- 

155 # Remote 

156 # ------------------------------------------------------------------------- 

157 

158 def _get_param(self, config: ServerConfig, param_key: str) -> str | None: 

159 """Fetch a string parameter from the Remote Config ServerConfig.""" 

160 try: 

161 value: str = config.get_string(param_key) 

162 if not value: 

163 LOG().warning(f"Remote config key [{param_key}] is empty or not set.") 

164 return None 

165 return value 

166 except Exception as e: 

167 LOG().error(f"Error fetching remote config key [{param_key}]: {e}") 

168 return None 

169 

170 def _apply_config(self, key: ConfigKey, raw_json: str) -> None: 

171 """ 

172 Apply a remote config value to the appropriate loader. 

173 

174 Remote Config values are stored as JSON. Since JSON is valid YAML, the 

175 raw string can be passed directly to each loader's ``try_reload()`` 

176 without any intermediate conversion. 

177 

178 ``try_reload()`` parses into a temporary state first and only swaps it 

179 in on full success, so a bad config payload never corrupts the running 

180 state. 

181 """ 

182 yaml_io: IO[str] = io.StringIO(raw_json) 

183 success: bool = False 

184 match key: 

185 case ConfigKey.JOB_CONFIG: 

186 LOG().warning(f"Applying remote config for {key.value}...") 

187 success = self.job_config.try_reload(yaml_io) 

188 case ConfigKey.AVATAR_CONFIG: 

189 LOG().warning(f"Applying remote config for {key.value}...") 

190 success = self.avatar_config.try_reload(yaml_io) 

191 case _: 

192 LOG().warning( 

193 f"Applying remote config for {key.value} with raw value: {raw_json}...", 

194 ) 

195 self._set(key, raw_json) 

196 success = True 

197 

198 status = "applied successfully" if success else "failed — existing config preserved" 

199 LOG().info(f"Remote config [{key.value}]: {status}.") 

200 

201 # ------------------------------------------------------------------------------------------- 

202 # helpers 

203 # ------------------------------------------------------------------------------------------- 

204 

205 def _set(self, key: ConfigKey, value: Any) -> None: 

206 if value is None: 

207 LOG().warning(f"Attempted to set config key [{key.value}] to None, ignoring.") 

208 return 

209 

210 existing = self._buffer.get(key, None) 

211 if existing is not None: 

212 msg = f"Overriding existing config key [{key.value}] with new value: {value} (was: {existing})" 

213 LOG().warning(msg) 

214 

215 # check the existing type matches value 

216 value_type = key.value_type 

217 if not isinstance(value, value_type): 

218 msg = ( 

219 f"Type mismatch for config key [{key.value}]: " 

220 f"expected {value_type.__name__}, got {type(value).__name__}. " 

221 "Existing value will be preserved." 

222 ) 

223 LOG().error(msg) 

224 return 

225 

226 LOG().info(f"Setting config key [{key.value}] to value: {value}") 

227 self._buffer[key] = value 

228 

229 def _get[T](self, key: ConfigKey, k_type: type[T]) -> T: 

230 """Retrieve and cast the value.""" 

231 buffer = self._buffer 

232 overrides = self._overrides 

233 

234 if key not in buffer and key not in overrides: 

235 msg = f"CONFIG ERROR: Missing required environment variable: {key.value}\n" 

236 raise ServerError(message=msg, error_code=AppErrorCode.SERVER_CONFIG) 

237 

238 value = overrides[key] if overrides and key in overrides else buffer[key] 

239 if value is None: 

240 raise ServerError(f"Missing config: {key.value}") 

241 

242 # Optional: verify the type matches what the caller expects 

243 if not isinstance(value, k_type): 

244 try: 

245 # Attempt recovery cast 

246 return (k_type)(value) # type: ignore 

247 except (ValueError, TypeError) as ex: 

248 msg = ( 

249 f"CONFIG ERROR: Invalid type for environment variable: {key}, " 

250 f" expected {k_type.__name__}, " 

251 f" got {type(buffer[key]).__name__}\nError: {ex}" 

252 ) 

253 raise ServerError(message=msg, error_code=AppErrorCode.SERVER_CONFIG) from ex 

254 

255 return value 

256 

257 # ------------------------------------------------------------------------- 

258 # Parameters 

259 # ------------------------------------------------------------------------- 

260 

261 # 

262 # jobs 

263 # 

264 @property 

265 def change_score_threshold(self) -> float: 

266 return self._get(ConfigKey.CHANGE_SCORE_THRESHOLD, float) 

267 

268 # 

269 # firebase 

270 # 

271 @property 

272 def credential(self) -> dict[str, str]: 

273 overrides = self._overrides 

274 buffer = self._buffer 

275 key = ConfigKey.CREDENTIAL 

276 if key not in buffer and key not in overrides: 

277 msg = f"CONFIG ERROR: Missing required environment variable: {key.value}\n" 

278 raise ServerError(message=msg, error_code=AppErrorCode.SERVER_CONFIG) 

279 

280 value = overrides[key] if overrides and key in overrides else buffer[key] 

281 if not isinstance(value, dict): 

282 msg = f"CONFIG ERROR: Invalid type for firebase credential, expected dict, got {type(value).__name__}\n" 

283 raise ServerError(message=msg, error_code=AppErrorCode.SERVER_CONFIG) 

284 

285 return value 

286 

287 @property 

288 def storage_bucket_name(self) -> str: 

289 return self._get(ConfigKey.STORAGE_BUCKET, str) 

290 

291 # 

292 # app keys 

293 # 

294 def api_key(self, is_backend: bool) -> str: 

295 if is_backend: 

296 return self._get(ConfigKey.BACKEND_API, str) 

297 return self._get(ConfigKey.API_KEY, str) 

298 

299 @property 

300 def search_api_key(self) -> str: 

301 return self._get(ConfigKey.SEARCH_API, str) 

302 

303 # 

304 # misc 

305 # 

306 

307 @property 

308 def exchange_rate_api_key(self) -> str: 

309 return self._get(ConfigKey.EXCHANGE_RATE_API, str) 

310 

311 # 

312 # stripe 

313 # 

314 

315 # RELEASE: FLP-633 - admin account for access to the Stripe FX api. 

316 @property 

317 def currency_conversion_account_id(self) -> str: 

318 return self._get(ConfigKey.STRIPE_CURRENCY_CONVERSION_ACCOUNT_ID, str) 

319 

320 @property 

321 def stripe_platform_account_id(self) -> str: 

322 return self._get(ConfigKey.STRIPE_PLATFORM_ACCOUNT_ID, str) 

323 

324 @property 

325 def stripe_secret_key(self) -> str: 

326 return self._get(ConfigKey.STRIPE_SECRET, str) 

327 

328 @property 

329 def stripe_webhook_key(self) -> str: 

330 return self._get(ConfigKey.STRIPE_WEBHOOK_KEY, str) 

331 

332 @property 

333 def stripe_return_webhook_url(self) -> str: 

334 return self._get(ConfigKey.STRIPE_RETURN_WEBHOOK_URL, str) 

335 

336 @property 

337 def stripe_refresh_webhook_url(self) -> str: 

338 return self._get(ConfigKey.STRIPE_REFRESH_WEBHOOK_URL, str) 

339 

340 @property 

341 def stripe_pay_webhook_url(self) -> str: 

342 return self._get(ConfigKey.STRIPE_PAY_WEBHOOK_URL, str) 

343 

344 # 

345 # search 

346 # 

347 @property 

348 def search_ip(self) -> str: 

349 return self._get(ConfigKey.SEARCH_IP, str) 

350 

351 @property 

352 def search_port(self) -> int: 

353 return self._get(ConfigKey.SEARCH_PORT, int) 

354 

355 @property 

356 def search_timeout(self) -> int: 

357 return self._get(ConfigKey.SEARCH_TIMEOUT, int) 

358 

359 @property 

360 def search_enable_nat_lang(self) -> bool: 

361 return self._get(ConfigKey.SEARCH_ENABLE_NAT_LANG, bool) 

362 

363 # 

364 # AI 

365 # 

366 @property 

367 def gemini_api_key(self) -> str: 

368 return self._get(ConfigKey.GEMINI_API, str) 

369 

370 # 

371 # email 

372 # 

373 @property 

374 def smtp_timeout(self) -> int: 

375 return self._get(ConfigKey.SMTP_TIMEOUT, int) 

376 

377 @property 

378 def smtp_gateway(self) -> str: 

379 return self._get(ConfigKey.SMTP_GATEWAY, str) 

380 

381 @property 

382 def smtp_port(self) -> int: 

383 return self._get(ConfigKey.SMTP_PORT, int) 

384 

385 @property 

386 def smtp_username(self) -> str: 

387 return self._get(ConfigKey.SMTP_USERNAME, str) 

388 

389 @property 

390 def smtp_password(self) -> str: 

391 return self._get(ConfigKey.SMTP_PASSWORD, str) 

392 

393 # ------------------------------------------------------------------------------------------- 

394 # ADMIN utils 

395 # ------------------------------------------------------------------------------------------- 

396 

397 def ping_app(self, req: flask.Request) -> flask.Response: # pragma: no cover 

398 from flipdare.app_config import AppConfig 

399 from flipdare.services import get_db_manager 

400 from flipdare.services import get_admin_mailer 

401 from flipdare.core.app_response import AppErrorResponse 

402 

403 # internal function, no coverage 

404 result = AppRequest.http(req, AppHttpRequestType.PING_BACKEND) 

405 mailer = get_admin_mailer() 

406 

407 try: 

408 result.is_authenticated() 

409 except AppError as error: 

410 msg = f"Not Authenticated?:\n{error!s}\n" 

411 mailer.send_error( 

412 error_code=AppErrorCode.SERVER, 

413 category=AppLogCategory.COMMAND, 

414 message=msg, 

415 include_stack=True, 

416 ) 

417 return AppErrorResponse.from_context( 

418 ctx=ErrorContext.unauthorized(req.url, message=msg), 

419 ).raw_response() 

420 

421 try: 

422 config = AppConfig.instance() 

423 config.validate() 

424 except Exception as ex: 

425 msg = f"AppService not configured correctly..: {ex}" 

426 mailer.send_error( 

427 error_code=AppErrorCode.SERVER, 

428 category=AppLogCategory.COMMAND, 

429 message=msg, 

430 include_stack=True, 

431 ) 

432 return AppErrorResponse.from_context( 

433 ctx=ErrorContext.server_error(req.url, message=msg), 

434 ).raw_response() 

435 

436 try: 

437 client = get_db_manager().database_client 

438 client.collections() 

439 except Exception as e: 

440 msg = f"AppService not configured correctly..: {e}" 

441 mailer.send_error( 

442 error_code=AppErrorCode.SERVER, 

443 category=AppLogCategory.COMMAND, 

444 message=msg, 

445 include_stack=True, 

446 ) 

447 return AppErrorResponse.from_context( 

448 ctx=ErrorContext.server_error(req.url, message=msg), 

449 ).raw_response() 

450 

451 return AppOkResponse.ok().raw_response() 

452 

453 

454class AppConfigValidator: 

455 def __init__(self, config: AppConfig) -> None: 

456 self._config = config 

457 

458 def validate(self) -> None: 

459 """Validate that required settings are present.""" 

460 from flipdare.error.message_format import ConfigErrorMsgFormat 

461 

462 has_error: bool = False 

463 

464 LOG().info("Validating AppConfig settings...") 

465 key_errors = self.validate_config() 

466 if key_errors is not None: 

467 has_error = True 

468 

469 config_error = self.validate_job_config() 

470 if config_error is not None: 

471 has_error = True 

472 

473 template_errors = self.validate_templates() 

474 if template_errors is not None: 

475 has_error = True 

476 

477 if not has_error: 

478 LOG().info("AppConfig validation successful. All settings are valid.") 

479 return 

480 

481 formatted = ConfigErrorMsgFormat( 

482 missing_keys=key_errors or None, 

483 template_errors=template_errors, 

484 config_error=config_error, 

485 ) 

486 LOG().error(f"AppConfig validation failed:\n{formatted}") 

487 raise ServerError(message=str(formatted), error_code=AppErrorCode.SERVER_CONFIG) 

488 

489 def validate_config(self) -> list[str] | None: 

490 errors: list[str] = [] 

491 

492 LOG().info("Validating AppConfig settings...") 

493 for key in ConfigKey: 

494 if not key.is_config: 

495 LOG().debug(f"Skipping trigger config key during validation: {key.value}") 

496 continue 

497 

498 value: Any | None = None 

499 try: 

500 value = self._config._get(key, key.value_type) 

501 except KeyError: 

502 errors.append(f"\tMissing config key: {key.value}\n") 

503 

504 if value is None: 

505 errors.append(f"\tMissing config key: {key.value}\n") 

506 elif not is_text_present(str(value)): 

507 # this should valid any (int, bool, float, str) empty fields.. 

508 errors.append(f"\tEmpty value for config key: {key.value}\n") 

509 

510 if len(errors) == 0: 

511 LOG().info("All configuration keys validated successfully.") 

512 else: 

513 msg = f"Configuration validation failed with the following missing keys: {'\n'.join(errors)}" 

514 LOG().error(msg) 

515 

516 return errors if len(errors) > 0 else None 

517 

518 def validate_job_config(self) -> Exception | None: 

519 LOG().info("Validating AppConfig settings...") 

520 config_error: Exception | None = None 

521 

522 try: 

523 config_loader = self._config.job_config 

524 config_loader.validate_enums() 

525 except Exception as ex: 

526 config_error = ex 

527 

528 if config_error is None: 

529 LOG().info("Configuration enums validated successfully.") 

530 else: 

531 LOG().error(f"Configuration enum validation failed: {config_error}") 

532 

533 return config_error 

534 

535 def validate_templates(self) -> list[str] | None: 

536 LOG().info("Validating email templates...") 

537 missing: list[str] = [] 

538 

539 # Validate all email type classes 

540 for user_email_type in AppEmailType: 

541 missing.extend(self._validate_paths(user_email_type)) 

542 

543 ok = len(missing) == 0 

544 if ok: 

545 LOG().info("All email templates validated successfully.") 

546 else: 

547 LOG().error("Email template validation failed with the following issues:") 

548 for issue in missing: 

549 LOG().error(issue) 

550 

551 return missing if len(missing) > 0 else None 

552 

553 def _validate_paths( 

554 self, 

555 email_type: AppEmailType, 

556 ) -> list[str]: 

557 missing: list[str] = [] 

558 

559 html_path = email_type.html_path 

560 text_path = email_type.text_path 

561 

562 if not Path(html_path).exists(): 

563 missing.append(f"\tHTML: {html_path}\n") 

564 

565 if not Path(text_path).exists(): 

566 missing.append(f"\tText: {text_path}\n") 

567 

568 return missing