Coverage for functions \ flipdare \ backend \ app_scheduler.py: 71%
228 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from collections.abc import Callable
16from typing import TYPE_CHECKING
17from functools import partial
18from flipdare.manager.backend_manager import BackendManager
19from flipdare.manager.task_manager import TaskManager
20from flipdare.task.command_task_handler import CommandTaskHandler
21from flipdare.backend.app_stats import AppStats
22from flipdare.task.cron_task_handler import CronTaskHandler
23from flipdare.app_log import LOG
24from flipdare.app_types import CronResult, TriggerResult
25from flipdare.job_types import (
26 TaskJobType,
27 TriggerJobType,
28 is_command_job,
29 is_cron_job,
30 is_report_job,
31 is_trigger_job,
32)
33from flipdare.constants import IS_DEBUG, JOB_WAIT_SANITY_SECONDS
34from flipdare.core.singleton import Singleton
35from flipdare.generated.shared.app_error_code import AppErrorCode
36from flipdare.generated.shared.backend.app_job_type import AppJobType
37from flipdare.generated.shared.backend.system_log_type import SystemLogType
38from flipdare.job.app_job_schedule import AppJobSchedule
39from flipdare.result.outcome import Outcome
40from flipdare.generated.shared.firestore_collections import FirestoreCollections
41from flipdare.util.time_util import FirestoreTime, TimeUtil
43if TYPE_CHECKING:
44 from flipdare.job.job_config import JobConfig
45 from flipdare.task.trigger_task_handler import TriggerTaskHandler
46 from flipdare.manager.service_manager import ServiceManager
47 from flipdare.manager.search_manager import SearchManager
48 from flipdare.task.report_task_handler import ReportTaskHandler
49 from flipdare.backend.app_logger import AppLogger
50 from flipdare.backend.job_logger import JobLogger
51 from flipdare.backend.runtime_config_admin import RuntimeConfigAdmin
54__all__ = ["AppScheduler"]
57class AppScheduler(Singleton):
59 def __init__(
60 self,
61 service_manager: ServiceManager | None = None,
62 search_manager: SearchManager | None = None,
63 backend_manager: BackendManager | None = None,
64 task_manager: TaskManager | None = None,
65 wait_seconds: int = JOB_WAIT_SANITY_SECONDS,
66 ) -> None:
67 super().__init__()
69 self._service_manager = service_manager
70 self._search_manager = search_manager
71 self._backend_manager = backend_manager
72 self._task_manager = task_manager
73 self._wait_seconds = wait_seconds
75 @property
76 def wait_seconds(self) -> int:
77 return self._wait_seconds
79 @property
80 def search_manager(self) -> SearchManager:
81 from flipdare.services import get_search_manager
83 if self._search_manager is None:
84 self._search_manager = get_search_manager()
85 return self._search_manager
87 @property
88 def service_manager(self) -> ServiceManager:
89 from flipdare.services import get_service_manager
91 if self._service_manager is None:
92 self._service_manager = get_service_manager()
93 return self._service_manager
95 @property
96 def backend_manager(self) -> BackendManager:
97 from flipdare.services import get_backend_manager
99 if self._backend_manager is None:
100 self._backend_manager = get_backend_manager()
101 return self._backend_manager
103 @property
104 def task_manager(self) -> TaskManager:
105 from flipdare.services import get_task_manager
107 if self._task_manager is None:
108 self._task_manager = get_task_manager()
109 return self._task_manager
111 @property
112 def job_config(self) -> JobConfig:
113 from flipdare.app_config import get_job_config
115 return get_job_config()
117 @property
118 def runtime_config(self) -> RuntimeConfigAdmin:
119 return self.backend_manager.runtime_config
121 @property
122 def job_logger(self) -> JobLogger:
123 return self.backend_manager.job_logger
125 @property
126 def app_logger(self) -> AppLogger:
127 return self.backend_manager.app_logger
129 @property
130 def app_stats(self) -> AppStats:
131 return self.backend_manager.app_stats
133 @property
134 def report_handler(self) -> ReportTaskHandler:
135 return self.task_manager.report_handler
137 @property
138 def cron_handler(self) -> CronTaskHandler:
139 return self.task_manager.cron_handler
141 @property
142 def command_handler(self) -> CommandTaskHandler:
143 return self.task_manager.command_handler
145 @property
146 def trigger_handler(self) -> TriggerTaskHandler:
147 return self.task_manager.trigger_handler
149 def run(self, interval: AppJobSchedule) -> None:
150 job_config = self.job_config
152 jobs_config = job_config.by_interval(interval)
153 if len(jobs_config) == 0:
154 LOG().warning(f"No jobs found for interval: {interval.value}.")
156 for cfg in jobs_config:
157 job_type = cfg.job_type
158 if not job_type.is_trigger and not job_type.is_cron:
159 cause = f"Invalid job type {job_type.label}.."
160 self._log_error(job_type=job_type, cause=cause)
161 continue
162 if not self._can_run(job_type):
163 msg = (
164 f"Skipping job_type: {job_type.label} since another instance is still running."
165 )
166 LOG().warning(msg)
167 continue
168 self._process(job_type)
170 def _process(self, job_type: AppJobType) -> None:
171 if not job_type.is_cron and not job_type.is_trigger:
172 msg = f"Unsupported job type: {job_type.label}"
173 self._log_error(job_type=job_type, cause=msg)
174 return
176 debug_str = f"{job_type.label}:{FirestoreTime.internal_str(TimeUtil.get_current_utc_dt())}"
177 live_config = self.runtime_config
179 try:
180 live_config.start_job(job_type)
181 LOG().info(f"Processing job_type: {debug_str}")
183 match job_type:
184 case jt if is_command_job(jt) or is_report_job(jt) or is_cron_job(jt):
185 self._run_task(jt)
186 case jt if is_trigger_job(jt):
187 self._run_trigger(jt)
188 case _:
189 self._log_error(job_type=job_type, cause="Unsupported job type.")
190 except Exception as e:
191 cause = f"Error processing {debug_str}: {e}"
192 self._log_error(job_type=job_type, cause=cause)
193 finally:
194 live_config.cancel_job(job_type)
196 # NOTE: all logging should be completed
197 # NOTE: by called methods..
199 def _run_trigger(self, job_type: TriggerJobType) -> None:
200 """
201 Process trigger jobs from the database.
203 Flow:
204 1. Fetches AppJobModel instances (PersistedWrapper[AppJobModel]) from jobs collection
205 2. Each job contains obj_id pointing to the actual document
206 3. Trigger handler (e.g., trigger_user) receives the job
207 4. Handler fetches fresh PersistedWrapper[TModel] from Firestore using obj_id
208 5. Handler calls processor with the fresh model data
210 This ensures handlers always work with current data, not stale trigger snapshots.
211 """
212 LOG().info(f"Processing jobs for trigger {job_type.label}..")
213 job_creator = self.job_logger
214 jobs = job_creator.get_jobs([job_type])
215 if jobs is None:
216 cause = f"Failed to get {job_type.label} jobs.."
217 self.app_logger.system_error(message=cause, error_code=AppErrorCode.SCHEDULER)
218 return
219 if len(jobs) == 0:
220 LOG().info(f"No {job_type.label} jobs to process.")
221 return
223 started_at = TimeUtil.get_current_utc_dt()
224 success_ct = 0
225 failed_ct = 0
227 trigger_handler = self.trigger_handler
228 for job in jobs:
229 j_type = job.job_type
230 if not is_trigger_job(j_type):
231 cause = f"Expected trigger job type but got {j_type.label} for job {job.doc_id}"
232 self._log_error(job_type=job_type, cause=cause)
233 failed_ct += 1
234 continue
236 try:
237 result = trigger_handler.run_trigger(j_type, job)
238 if result == Outcome.OK:
239 success_ct += 1
240 else:
241 failed_ct += 1
242 except Exception as e:
243 cause = f"Error processing job {job.doc_id} of type {j_type.label}: {e}"
244 self._log_error(job_type=j_type, cause=cause)
245 failed_ct += 1
247 ended_at = TimeUtil.get_current_utc_dt()
248 duration = TimeUtil.duration_in_seconds(started_at, ended_at)
250 if IS_DEBUG:
251 msg = (
252 f"Processed {len(jobs)} {job_type.label} jobs in {duration} secs"
253 f" : Success={success_ct}, Failed={failed_ct}"
254 )
255 LOG().debug(msg)
257 self.job_logger.resolve_jobs(jobs)
258 self._log_info(job_type, f"Completed processing {len(jobs)} {job_type.label} jobs: ")
260 self.app_stats.add_count(
261 job_type=job_type,
262 success_ct=success_ct,
263 failed_ct=failed_ct,
264 skipped_ct=0,
265 duration=duration,
266 )
268 def _run_task(self, job_type: TaskJobType) -> None:
269 report_handler = self.report_handler
270 command_handler = self.command_handler
271 cron_handler = self.cron_handler
273 call_fn: Callable[..., CronResult | TriggerResult] | None = None
275 match job_type:
276 case jt if is_command_job(jt):
277 call_fn = partial(command_handler.run_command, job_type=jt)
278 case jt if is_report_job(jt):
279 call_fn = partial(report_handler.run_report, job_type=jt)
280 case jt if is_cron_job(jt):
281 call_fn = partial(cron_handler.run_cron, job_type=jt)
282 case _:
283 self._log_error(job_type=job_type, cause="Unsupported job type.")
284 return
286 started_at = TimeUtil.get_current_utc_dt()
287 debug_str = f"{job_type.label}:{TimeUtil.formatted_dt(started_at)}"
288 result: CronResult | TriggerResult | None = None
289 if IS_DEBUG:
290 LOG().debug(f"Processing cron: {debug_str}..")
292 try:
293 result = call_fn()
294 except Exception as e:
295 cause = f"Error processing cron {job_type.label}: {e}"
296 self._log_error(job_type=job_type, cause=cause)
298 ended_at = TimeUtil.get_current_utc_dt()
299 duration = TimeUtil.duration_in_seconds(started_at, ended_at)
300 if IS_DEBUG:
301 LOG().debug(f"Completed cron: {debug_str} in {duration} secs.")
303 if result is None:
304 LOG().warning(f"Cron {job_type.label} did not return a stat/output result.")
305 self._log_error(job_type=job_type, cause="Cron did not return a stat/output result.")
306 return
308 # completed ok..
309 LOG().info(f"Cron {job_type.label} result: {result}")
310 self._log_info(job_type=job_type, cause=f"Cron result: {result}")
312 self.app_stats.add(
313 job_type=job_type,
314 result=result,
315 duration=duration,
316 )
318 def still_running(self, job_type: AppJobType) -> bool:
319 """Check if enough time has passed to safely cancel a job."""
320 LOG().debug(f"Checking if job_type: {job_type.label} is still running...")
322 live_config = self.runtime_config
323 started_at = live_config.is_job_running(job_type)
324 if started_at is None:
325 return False
327 now = TimeUtil.get_current_utc_dt()
328 elapsed_seconds = (now - started_at).total_seconds()
329 if elapsed_seconds < self.wait_seconds:
330 self._log_info(
331 job_type,
332 f"Found running job {job_type.label} started at "
333 f"{FirestoreTime.internal_str(started_at)}, skipping schedule job.",
334 )
335 return True
336 # stale job, cancel it
337 cause = (
338 f"Found running job {job_type.label} started at "
339 f"{FirestoreTime.internal_str(started_at)} "
340 f"({TimeUtil.formatted_seconds(int(elapsed_seconds))} ago), "
341 "cancelling it."
342 )
343 self._log_error(job_type=job_type, cause=cause, log_type=SystemLogType.WARNING)
344 live_config.cancel_job(job_type)
345 return False
347 def _can_run(self, job_type: AppJobType) -> bool:
348 LOG().debug(f"Checking if job_type: {job_type.label} can run...")
349 live_config = self.runtime_config
351 if not live_config.is_job_enabled(job_type):
352 msg = f"Skipping job_type: {job_type.label} disabled in runtime config."
353 self._log_info(job_type, msg)
354 return False
355 if self.still_running(job_type):
356 msg = f"Skipping job_type: {job_type.label}, previous job still running."
357 self._log_info(job_type, msg)
358 return False
359 return True
361 def _log_error(
362 self,
363 job_type: AppJobType,
364 cause: str,
365 log_type: SystemLogType = SystemLogType.ERROR,
366 ) -> None:
367 self.app_logger.system_error(
368 log_type=log_type,
369 message=cause,
370 error_code=AppErrorCode.SCHEDULER,
371 job_type=job_type,
372 collection=FirestoreCollections.RUNTIME_JOB,
373 )
375 def _log_info(self, job_type: AppJobType, cause: str) -> None:
376 self.app_logger.info(
377 message=cause,
378 job_type=job_type,
379 collection=FirestoreCollections.RUNTIME_JOB,
380 )