Coverage for functions \ flipdare \ backend \ app_scheduler.py: 71%

228 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from collections.abc import Callable 

16from typing import TYPE_CHECKING 

17from functools import partial 

18from flipdare.manager.backend_manager import BackendManager 

19from flipdare.manager.task_manager import TaskManager 

20from flipdare.task.command_task_handler import CommandTaskHandler 

21from flipdare.backend.app_stats import AppStats 

22from flipdare.task.cron_task_handler import CronTaskHandler 

23from flipdare.app_log import LOG 

24from flipdare.app_types import CronResult, TriggerResult 

25from flipdare.job_types import ( 

26 TaskJobType, 

27 TriggerJobType, 

28 is_command_job, 

29 is_cron_job, 

30 is_report_job, 

31 is_trigger_job, 

32) 

33from flipdare.constants import IS_DEBUG, JOB_WAIT_SANITY_SECONDS 

34from flipdare.core.singleton import Singleton 

35from flipdare.generated.shared.app_error_code import AppErrorCode 

36from flipdare.generated.shared.backend.app_job_type import AppJobType 

37from flipdare.generated.shared.backend.system_log_type import SystemLogType 

38from flipdare.job.app_job_schedule import AppJobSchedule 

39from flipdare.result.outcome import Outcome 

40from flipdare.generated.shared.firestore_collections import FirestoreCollections 

41from flipdare.util.time_util import FirestoreTime, TimeUtil 

42 

43if TYPE_CHECKING: 

44 from flipdare.job.job_config import JobConfig 

45 from flipdare.task.trigger_task_handler import TriggerTaskHandler 

46 from flipdare.manager.service_manager import ServiceManager 

47 from flipdare.manager.search_manager import SearchManager 

48 from flipdare.task.report_task_handler import ReportTaskHandler 

49 from flipdare.backend.app_logger import AppLogger 

50 from flipdare.backend.job_logger import JobLogger 

51 from flipdare.backend.runtime_config_admin import RuntimeConfigAdmin 

52 

53 

54__all__ = ["AppScheduler"] 

55 

56 

57class AppScheduler(Singleton): 

58 

59 def __init__( 

60 self, 

61 service_manager: ServiceManager | None = None, 

62 search_manager: SearchManager | None = None, 

63 backend_manager: BackendManager | None = None, 

64 task_manager: TaskManager | None = None, 

65 wait_seconds: int = JOB_WAIT_SANITY_SECONDS, 

66 ) -> None: 

67 super().__init__() 

68 

69 self._service_manager = service_manager 

70 self._search_manager = search_manager 

71 self._backend_manager = backend_manager 

72 self._task_manager = task_manager 

73 self._wait_seconds = wait_seconds 

74 

75 @property 

76 def wait_seconds(self) -> int: 

77 return self._wait_seconds 

78 

79 @property 

80 def search_manager(self) -> SearchManager: 

81 from flipdare.services import get_search_manager 

82 

83 if self._search_manager is None: 

84 self._search_manager = get_search_manager() 

85 return self._search_manager 

86 

87 @property 

88 def service_manager(self) -> ServiceManager: 

89 from flipdare.services import get_service_manager 

90 

91 if self._service_manager is None: 

92 self._service_manager = get_service_manager() 

93 return self._service_manager 

94 

95 @property 

96 def backend_manager(self) -> BackendManager: 

97 from flipdare.services import get_backend_manager 

98 

99 if self._backend_manager is None: 

100 self._backend_manager = get_backend_manager() 

101 return self._backend_manager 

102 

103 @property 

104 def task_manager(self) -> TaskManager: 

105 from flipdare.services import get_task_manager 

106 

107 if self._task_manager is None: 

108 self._task_manager = get_task_manager() 

109 return self._task_manager 

110 

111 @property 

112 def job_config(self) -> JobConfig: 

113 from flipdare.app_config import get_job_config 

114 

115 return get_job_config() 

116 

117 @property 

118 def runtime_config(self) -> RuntimeConfigAdmin: 

119 return self.backend_manager.runtime_config 

120 

121 @property 

122 def job_logger(self) -> JobLogger: 

123 return self.backend_manager.job_logger 

124 

125 @property 

126 def app_logger(self) -> AppLogger: 

127 return self.backend_manager.app_logger 

128 

129 @property 

130 def app_stats(self) -> AppStats: 

131 return self.backend_manager.app_stats 

132 

133 @property 

134 def report_handler(self) -> ReportTaskHandler: 

135 return self.task_manager.report_handler 

136 

137 @property 

138 def cron_handler(self) -> CronTaskHandler: 

139 return self.task_manager.cron_handler 

140 

141 @property 

142 def command_handler(self) -> CommandTaskHandler: 

143 return self.task_manager.command_handler 

144 

145 @property 

146 def trigger_handler(self) -> TriggerTaskHandler: 

147 return self.task_manager.trigger_handler 

148 

149 def run(self, interval: AppJobSchedule) -> None: 

150 job_config = self.job_config 

151 

152 jobs_config = job_config.by_interval(interval) 

153 if len(jobs_config) == 0: 

154 LOG().warning(f"No jobs found for interval: {interval.value}.") 

155 

156 for cfg in jobs_config: 

157 job_type = cfg.job_type 

158 if not job_type.is_trigger and not job_type.is_cron: 

159 cause = f"Invalid job type {job_type.label}.." 

160 self._log_error(job_type=job_type, cause=cause) 

161 continue 

162 if not self._can_run(job_type): 

163 msg = ( 

164 f"Skipping job_type: {job_type.label} since another instance is still running." 

165 ) 

166 LOG().warning(msg) 

167 continue 

168 self._process(job_type) 

169 

170 def _process(self, job_type: AppJobType) -> None: 

171 if not job_type.is_cron and not job_type.is_trigger: 

172 msg = f"Unsupported job type: {job_type.label}" 

173 self._log_error(job_type=job_type, cause=msg) 

174 return 

175 

176 debug_str = f"{job_type.label}:{FirestoreTime.internal_str(TimeUtil.get_current_utc_dt())}" 

177 live_config = self.runtime_config 

178 

179 try: 

180 live_config.start_job(job_type) 

181 LOG().info(f"Processing job_type: {debug_str}") 

182 

183 match job_type: 

184 case jt if is_command_job(jt) or is_report_job(jt) or is_cron_job(jt): 

185 self._run_task(jt) 

186 case jt if is_trigger_job(jt): 

187 self._run_trigger(jt) 

188 case _: 

189 self._log_error(job_type=job_type, cause="Unsupported job type.") 

190 except Exception as e: 

191 cause = f"Error processing {debug_str}: {e}" 

192 self._log_error(job_type=job_type, cause=cause) 

193 finally: 

194 live_config.cancel_job(job_type) 

195 

196 # NOTE: all logging should be completed 

197 # NOTE: by called methods.. 

198 

199 def _run_trigger(self, job_type: TriggerJobType) -> None: 

200 """ 

201 Process trigger jobs from the database. 

202 

203 Flow: 

204 1. Fetches AppJobModel instances (PersistedWrapper[AppJobModel]) from jobs collection 

205 2. Each job contains obj_id pointing to the actual document 

206 3. Trigger handler (e.g., trigger_user) receives the job 

207 4. Handler fetches fresh PersistedWrapper[TModel] from Firestore using obj_id 

208 5. Handler calls processor with the fresh model data 

209 

210 This ensures handlers always work with current data, not stale trigger snapshots. 

211 """ 

212 LOG().info(f"Processing jobs for trigger {job_type.label}..") 

213 job_creator = self.job_logger 

214 jobs = job_creator.get_jobs([job_type]) 

215 if jobs is None: 

216 cause = f"Failed to get {job_type.label} jobs.." 

217 self.app_logger.system_error(message=cause, error_code=AppErrorCode.SCHEDULER) 

218 return 

219 if len(jobs) == 0: 

220 LOG().info(f"No {job_type.label} jobs to process.") 

221 return 

222 

223 started_at = TimeUtil.get_current_utc_dt() 

224 success_ct = 0 

225 failed_ct = 0 

226 

227 trigger_handler = self.trigger_handler 

228 for job in jobs: 

229 j_type = job.job_type 

230 if not is_trigger_job(j_type): 

231 cause = f"Expected trigger job type but got {j_type.label} for job {job.doc_id}" 

232 self._log_error(job_type=job_type, cause=cause) 

233 failed_ct += 1 

234 continue 

235 

236 try: 

237 result = trigger_handler.run_trigger(j_type, job) 

238 if result == Outcome.OK: 

239 success_ct += 1 

240 else: 

241 failed_ct += 1 

242 except Exception as e: 

243 cause = f"Error processing job {job.doc_id} of type {j_type.label}: {e}" 

244 self._log_error(job_type=j_type, cause=cause) 

245 failed_ct += 1 

246 

247 ended_at = TimeUtil.get_current_utc_dt() 

248 duration = TimeUtil.duration_in_seconds(started_at, ended_at) 

249 

250 if IS_DEBUG: 

251 msg = ( 

252 f"Processed {len(jobs)} {job_type.label} jobs in {duration} secs" 

253 f" : Success={success_ct}, Failed={failed_ct}" 

254 ) 

255 LOG().debug(msg) 

256 

257 self.job_logger.resolve_jobs(jobs) 

258 self._log_info(job_type, f"Completed processing {len(jobs)} {job_type.label} jobs: ") 

259 

260 self.app_stats.add_count( 

261 job_type=job_type, 

262 success_ct=success_ct, 

263 failed_ct=failed_ct, 

264 skipped_ct=0, 

265 duration=duration, 

266 ) 

267 

268 def _run_task(self, job_type: TaskJobType) -> None: 

269 report_handler = self.report_handler 

270 command_handler = self.command_handler 

271 cron_handler = self.cron_handler 

272 

273 call_fn: Callable[..., CronResult | TriggerResult] | None = None 

274 

275 match job_type: 

276 case jt if is_command_job(jt): 

277 call_fn = partial(command_handler.run_command, job_type=jt) 

278 case jt if is_report_job(jt): 

279 call_fn = partial(report_handler.run_report, job_type=jt) 

280 case jt if is_cron_job(jt): 

281 call_fn = partial(cron_handler.run_cron, job_type=jt) 

282 case _: 

283 self._log_error(job_type=job_type, cause="Unsupported job type.") 

284 return 

285 

286 started_at = TimeUtil.get_current_utc_dt() 

287 debug_str = f"{job_type.label}:{TimeUtil.formatted_dt(started_at)}" 

288 result: CronResult | TriggerResult | None = None 

289 if IS_DEBUG: 

290 LOG().debug(f"Processing cron: {debug_str}..") 

291 

292 try: 

293 result = call_fn() 

294 except Exception as e: 

295 cause = f"Error processing cron {job_type.label}: {e}" 

296 self._log_error(job_type=job_type, cause=cause) 

297 

298 ended_at = TimeUtil.get_current_utc_dt() 

299 duration = TimeUtil.duration_in_seconds(started_at, ended_at) 

300 if IS_DEBUG: 

301 LOG().debug(f"Completed cron: {debug_str} in {duration} secs.") 

302 

303 if result is None: 

304 LOG().warning(f"Cron {job_type.label} did not return a stat/output result.") 

305 self._log_error(job_type=job_type, cause="Cron did not return a stat/output result.") 

306 return 

307 

308 # completed ok.. 

309 LOG().info(f"Cron {job_type.label} result: {result}") 

310 self._log_info(job_type=job_type, cause=f"Cron result: {result}") 

311 

312 self.app_stats.add( 

313 job_type=job_type, 

314 result=result, 

315 duration=duration, 

316 ) 

317 

318 def still_running(self, job_type: AppJobType) -> bool: 

319 """Check if enough time has passed to safely cancel a job.""" 

320 LOG().debug(f"Checking if job_type: {job_type.label} is still running...") 

321 

322 live_config = self.runtime_config 

323 started_at = live_config.is_job_running(job_type) 

324 if started_at is None: 

325 return False 

326 

327 now = TimeUtil.get_current_utc_dt() 

328 elapsed_seconds = (now - started_at).total_seconds() 

329 if elapsed_seconds < self.wait_seconds: 

330 self._log_info( 

331 job_type, 

332 f"Found running job {job_type.label} started at " 

333 f"{FirestoreTime.internal_str(started_at)}, skipping schedule job.", 

334 ) 

335 return True 

336 # stale job, cancel it 

337 cause = ( 

338 f"Found running job {job_type.label} started at " 

339 f"{FirestoreTime.internal_str(started_at)} " 

340 f"({TimeUtil.formatted_seconds(int(elapsed_seconds))} ago), " 

341 "cancelling it." 

342 ) 

343 self._log_error(job_type=job_type, cause=cause, log_type=SystemLogType.WARNING) 

344 live_config.cancel_job(job_type) 

345 return False 

346 

347 def _can_run(self, job_type: AppJobType) -> bool: 

348 LOG().debug(f"Checking if job_type: {job_type.label} can run...") 

349 live_config = self.runtime_config 

350 

351 if not live_config.is_job_enabled(job_type): 

352 msg = f"Skipping job_type: {job_type.label} disabled in runtime config." 

353 self._log_info(job_type, msg) 

354 return False 

355 if self.still_running(job_type): 

356 msg = f"Skipping job_type: {job_type.label}, previous job still running." 

357 self._log_info(job_type, msg) 

358 return False 

359 return True 

360 

361 def _log_error( 

362 self, 

363 job_type: AppJobType, 

364 cause: str, 

365 log_type: SystemLogType = SystemLogType.ERROR, 

366 ) -> None: 

367 self.app_logger.system_error( 

368 log_type=log_type, 

369 message=cause, 

370 error_code=AppErrorCode.SCHEDULER, 

371 job_type=job_type, 

372 collection=FirestoreCollections.RUNTIME_JOB, 

373 ) 

374 

375 def _log_info(self, job_type: AppJobType, cause: str) -> None: 

376 self.app_logger.info( 

377 message=cause, 

378 job_type=job_type, 

379 collection=FirestoreCollections.RUNTIME_JOB, 

380 )