Coverage for functions \ flipdare \ backend \ runtime_config_admin.py: 97%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import Any, override 

16from abc import ABC, abstractmethod 

17from datetime import datetime 

18from flipdare.app_log import LOG 

19from flipdare.constants import IS_DEBUG, RUNTIME_DB_REFRESH_INTERVAL 

20from flipdare.core.singleton import Singleton 

21from flipdare.firestore.backend.run_config_group_db import RunConfigGroupDb 

22from flipdare.firestore.backend.run_config_job_db import RunConfigJobDb 

23from flipdare.generated.shared.backend.app_job_group import AppJobGroup 

24from flipdare.generated.shared.backend.app_job_type import AppJobType 

25from flipdare.job.job_config import JobConfig 

26from flipdare.util.time_util import TimeUtil 

27 

28__all__ = ["RuntimeConfigAdmin"] 

29 

30 

31class RuntimeConfigAdmin(Singleton): 

32 

33 def __init__( 

34 self, 

35 run_group_db: RunConfigGroupDb | None = None, 

36 run_job_db: RunConfigJobDb | None = None, 

37 job_config: JobConfig | None = None, 

38 default_value: bool = True, 

39 refresh_interval_seconds: int = RUNTIME_DB_REFRESH_INTERVAL, 

40 ) -> None: 

41 

42 from flipdare.services import get_run_group_db, get_run_job_db 

43 from flipdare.app_config import get_job_config 

44 

45 LOG().info("Initializing RuntimeConfigAdmin...") 

46 self._refresh_interval = refresh_interval_seconds 

47 

48 if job_config is None: 

49 job_config = get_job_config() 

50 

51 if run_group_db is None: 

52 run_group_db = get_run_group_db() 

53 

54 self._run_group = _RuntimeGroup( 

55 job_config=job_config, 

56 db=run_group_db, 

57 default_value=default_value, 

58 refresh_interval=refresh_interval_seconds, 

59 ) 

60 

61 if run_job_db is None: 

62 run_job_db = get_run_job_db() 

63 

64 self._run_job = _RuntimeJob( 

65 job_config=job_config, 

66 db=run_job_db, 

67 default_value=default_value, 

68 refresh_interval=refresh_interval_seconds, 

69 ) 

70 

71 LOG().info( 

72 f"RuntimeConfigAdmin initialized with default_value={default_value}, " 

73 f"refresh_interval_seconds={refresh_interval_seconds}", 

74 ) 

75 

76 @property 

77 def last_group_refresh(self) -> datetime | None: 

78 return self._run_group.last_refresh 

79 

80 @property 

81 def last_job_refresh(self) -> datetime | None: 

82 return self._run_job.last_refresh 

83 

84 @property 

85 def refresh_interval(self) -> int: 

86 return self._refresh_interval 

87 

88 # --------------------------------------------------------------------------------------------- 

89 # GROUP CONFIG 

90 # --------------------------------------------------------------------------------------------- 

91 

92 def disable_all(self) -> None: 

93 self._run_group.disable_all() 

94 LOG().info("Disabled all RuntimeConfig types.") 

95 

96 def enable_all(self) -> None: 

97 self._run_group.enable_all() 

98 LOG().info("Enabled all RuntimeConfig types.") 

99 

100 def enable_group(self, job_group: AppJobGroup) -> None: 

101 self._run_group.enable(job_group) 

102 

103 def disable_group(self, job_group: AppJobGroup) -> None: 

104 self._run_group.disable(job_group) 

105 

106 def is_group_enabled(self, job_group: AppJobGroup) -> bool: 

107 """Return the cached enabled state for a job group (refreshes periodically).""" 

108 return self._run_group.is_enabled(job_group) 

109 

110 def is_job_enabled(self, job_type: AppJobType) -> bool: 

111 """Return whether the group that owns *job_type* is currently enabled.""" 

112 return self._run_group.is_job_enabled(job_type) 

113 

114 # --------------------------------------------------------------------------------------------- 

115 # JOB CONFIG 

116 # --------------------------------------------------------------------------------------------- 

117 

118 def start_job(self, job_type: AppJobType) -> None: 

119 # we add an entry in the db to prevent other instances from starting the same job 

120 self._run_job.start_job(job_type) 

121 

122 def is_job_running(self, job_type: AppJobType) -> datetime | None: 

123 return self._run_job.is_job_running(job_type) 

124 

125 def cancel_job(self, job_type: AppJobType) -> None: 

126 self._run_job.cancel_job(job_type) 

127 

128 

129# --------------------------------------------------------------------------------------------- 

130# INTERNAL CLASSES 

131# --------------------------------------------------------------------------------------------- 

132 

133 

134class _RuntimeMixin[T](ABC): 

135 __slots__ = ( 

136 "_data", 

137 "_job_config", 

138 "_label", 

139 "_last_refresh", 

140 "_refresh_interval", 

141 ) 

142 

143 def __init__( 

144 self, 

145 label: str, 

146 job_config: JobConfig, 

147 refresh_interval: int, 

148 last_refresh: datetime | None = None, 

149 ) -> None: 

150 self._label = label 

151 self._refresh_interval = refresh_interval 

152 self._last_refresh = last_refresh 

153 self._job_config = job_config 

154 self._data = dict[str, T]() 

155 

156 @abstractmethod 

157 def _refresh(self) -> int: ... 

158 

159 def check_refresh_required(self) -> None: 

160 """Check if we should refresh the config from the database, and refresh if so.""" 

161 # note: we have to check None here otherwise we get a recursion loop. 

162 if not self._should_refresh: 

163 if IS_DEBUG: 

164 msg = ( 

165 f"No refresh needed for '{self._label}' " 

166 f"(last refresh at {self._last_refresh}, interval={self._refresh_interval} sec)" 

167 ) 

168 LOG().debug(msg) 

169 return 

170 

171 changed_ct = self._refresh() 

172 self._last_refresh = TimeUtil.get_current_utc_dt() 

173 msg = ( 

174 f"Refreshed {self._label} (interval={self._refresh_interval} sec) " 

175 f"at {self._last_refresh}, changed {changed_ct} items" 

176 ) 

177 LOG().info(msg) 

178 

179 @property 

180 def job_config(self) -> JobConfig: 

181 return self._job_config 

182 

183 @property 

184 def last_refresh(self) -> datetime | None: 

185 return self._last_refresh 

186 

187 def set_refreshed(self) -> None: 

188 self._last_refresh = TimeUtil.get_current_utc_dt() 

189 

190 @property 

191 def _should_refresh(self) -> bool: 

192 last_refresh = self._last_refresh 

193 if last_refresh is None: 

194 return True 

195 

196 current_time = TimeUtil.get_current_utc_dt() 

197 elapsed = (current_time - last_refresh).total_seconds() 

198 

199 refresh_interval = self._refresh_interval 

200 return elapsed >= refresh_interval 

201 

202 def set_data(self, key: str, value: T) -> None: 

203 self._data[key] = value 

204 

205 def get_data(self, key: str, default_value: T) -> T: 

206 self.check_refresh_required() 

207 return self._data.get(key, default_value) 

208 

209 

210class _RuntimeGroup(_RuntimeMixin[bool]): 

211 def __init__( 

212 self, 

213 job_config: JobConfig, 

214 db: RunConfigGroupDb, 

215 default_value: bool, 

216 refresh_interval: int, 

217 ) -> None: 

218 super().__init__( 

219 label="RuntimeConfigGroup", 

220 job_config=job_config, 

221 refresh_interval=refresh_interval, 

222 ) 

223 self.db = db 

224 self.default_value = default_value 

225 for job_group in AppJobGroup: 

226 self.set_data(job_group, default_value) 

227 

228 @override 

229 def _refresh(self) -> int: 

230 if IS_DEBUG: 

231 LOG().debug("Refreshing RuntimeConfigGroup from database...") 

232 

233 changed_ct = 0 

234 for job_group in AppJobGroup: 

235 cfg = self.db.get_group_config(job_group) 

236 value = cfg.enabled if cfg is not None else self.default_value 

237 # access data directy to avoid recursion in get_data which 

238 # calls check_refresh_required which calls _refresh 

239 old_value = self._data.get(job_group, self.default_value) 

240 if old_value != value: 

241 changed_ct += 1 

242 self.set_data(job_group, value) 

243 if IS_DEBUG: 

244 msg = f"RuntimeConfigGroup change detected for '{job_group.value}': {old_value} -> {value}" 

245 LOG().debug(msg) 

246 

247 return changed_ct 

248 

249 def disable_all(self) -> None: 

250 for job_group in AppJobGroup: 

251 self.disable(job_group) 

252 

253 self.set_refreshed() 

254 LOG().info("Disabled all RuntimeConfigGroup types.") 

255 

256 def enable_all(self) -> None: 

257 for job_group in AppJobGroup: 

258 self.enable(job_group) 

259 

260 self.set_refreshed() 

261 LOG().info("Enabled all RuntimeConfigGroup types.") 

262 

263 def is_enabled(self, job_group: AppJobGroup) -> bool: 

264 return self.get_data(job_group, self.default_value) 

265 

266 def is_job_enabled(self, job_type: AppJobType) -> bool: 

267 return self.is_enabled(job_type.job_group) 

268 

269 def enable(self, job_group: AppJobGroup) -> None: 

270 try: 

271 if IS_DEBUG: 

272 LOG().debug(f"Enabling '{job_group.value}'...") 

273 

274 self.db.enable(job_group) 

275 self.set_data(job_group, True) 

276 except Exception as e: 

277 LOG().error(f"Failed to enable '{job_group.value}': {e}") 

278 self.set_data(job_group, self.default_value) 

279 

280 def disable(self, job_group: AppJobGroup) -> None: 

281 try: 

282 if IS_DEBUG: 

283 LOG().debug(f"Disabling '{job_group.value}'...") 

284 

285 self.db.disable(job_group) 

286 self.set_data(job_group, False) 

287 except Exception as e: 

288 LOG().error(f"Failed to disable '{job_group.value}': {e}") 

289 self.set_data(job_group, self.default_value) 

290 

291 

292class _RuntimeJob(_RuntimeMixin[Any]): 

293 def __init__( 

294 self, 

295 job_config: JobConfig, 

296 db: RunConfigJobDb, 

297 default_value: bool, 

298 refresh_interval: int, 

299 ) -> None: 

300 super().__init__( 

301 label="RuntimeConfigJob", 

302 job_config=job_config, 

303 refresh_interval=refresh_interval, 

304 ) 

305 self.db = db 

306 self.default_value = default_value 

307 

308 @override 

309 def _refresh(self) -> int: 

310 # refreshing not required for job status, since we check in realtime 

311 return 0 

312 

313 def start_job(self, job_type: AppJobType) -> None: 

314 self.db.start_job(job_type=job_type) 

315 

316 def is_job_running(self, job_type: AppJobType) -> datetime | None: 

317 cfg = self.db.get_job(job_type=job_type) 

318 if cfg is None: 

319 if IS_DEBUG: 

320 msg = ( 

321 f"Failed to check if job '{job_type}' is running, no config found in database" 

322 ) 

323 LOG().debug(msg) 

324 

325 return None 

326 

327 if IS_DEBUG: 

328 msg = f"Checked job status for '{job_type.job_group.value}'-'{job_type.value}': started at {cfg.started_at}" 

329 LOG().debug(msg) 

330 

331 return cfg.started_at 

332 

333 def cancel_job(self, job_type: AppJobType) -> None: 

334 self.db.cancel_job(job_type=job_type)