Coverage for functions \ flipdare \ job \ job_admin.py: 100%
0 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python3
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from typing import TYPE_CHECKING, Any
16from flipdare.app_config import AppConfig
17from flipdare.app_log import LOG
18from flipdare.app_types import DatabaseDict
19from flipdare.constants import IS_DEBUG
20from flipdare.core.singleton import Singleton
21from flipdare.generated.shared.app_error_code import AppErrorCode
22from flipdare.generated.shared.backend.app_job_type import AppJobType
23from flipdare.job.app_job_schedule import AppJobSchedule
24from flipdare.job.cron_validator import CronValidator
25from flipdare.job.trigger_data import TriggerData, UpdateTriggerData
26from flipdare.service._error_mixin import ErrorMixin
28if TYPE_CHECKING:
29 from flipdare.backend.app_scheduler import AppScheduler
30 from flipdare.backend.app_logger import AppLogger
31 from flipdare.backend.job_logger import JobLogger
32 from flipdare.backend.runtime_config_admin import RuntimeConfigAdmin
35class JobAdmin(ErrorMixin, Singleton):
36 def __init__(
37 self,
38 runtime_config: RuntimeConfigAdmin | None = None,
39 app_scheduler: AppScheduler | None = None,
40 app_config: AppConfig | None = None,
41 job_logger: JobLogger | None = None,
42 app_logger: AppLogger | None = None,
43 ) -> None:
44 super().__init__()
45 self._runtime_config = runtime_config
46 self._app_scheduler = app_scheduler
47 self._app_config = app_config
48 self._job_logger = job_logger
49 self._app_logger = app_logger
51 @property
52 def app_scheduler(self) -> AppScheduler:
53 from flipdare.services import get_app_scheduler
55 if self._app_scheduler is None:
56 self._app_scheduler = get_app_scheduler()
57 return self._app_scheduler
59 @property
60 def runtime_config(self) -> RuntimeConfigAdmin:
61 from flipdare.services import get_runtime_config
63 if self._runtime_config is None:
64 self._runtime_config = get_runtime_config()
65 return self._runtime_config
67 @property
68 def app_config(self) -> AppConfig:
69 from flipdare.app_config import get_app_config
71 if self._app_config is None:
72 self._app_config = get_app_config()
73 return self._app_config
75 @property
76 def job_logger(self) -> JobLogger:
77 from flipdare.services import get_job_logger
79 if self._job_logger is None:
80 self._job_logger = get_job_logger()
82 return self._job_logger
84 @property
85 def app_logger(self) -> AppLogger:
86 from flipdare.services import get_app_logger
88 if self._app_logger is None:
89 self._app_logger = get_app_logger()
91 return self._app_logger
93 def cron_job(self, cron_name: str, validator: CronValidator, interval: AppJobSchedule) -> None:
94 if not validator.valid():
95 msg = f"Cron job validation failed for {cron_name}"
96 self.validator_error(message=msg, validator=validator)
97 return
99 try:
100 self.app_scheduler.run(interval=interval)
101 except Exception as e:
102 msg = f"Error running cron job {cron_name}: {e}"
103 self.job_error(
104 job_type=validator.job_type,
105 error_code=AppErrorCode.TRIGGER,
106 message=msg,
107 error=e,
108 notify_admin=False,
109 )
111 def trigger_job(self, job_type: AppJobType, validated: TriggerData[Any, Any]) -> None:
112 """
113 Creates a job in the database from trigger data.
115 Flow:
116 1. Firestore trigger fires (e.g., user document created/updated)
117 2. TriggerData validates and creates PersistedWrapper[TModel] from the event
118 3. This method stores the model as a dict in the jobs collection
119 4. AppScheduler later processes these jobs, fetching fresh data from Firestore
121 NOTE: This method checks if a job is running but does not preemptively cancel.
122 NOTE: That is the responsibility of the scheduler.
124 FIXME: We should add low/high prority jobs..
125 - then when the queue is empty, we can process jobs with fewer changes.
126 - but still ensure the data is fresh for the users.
128 Args:
129 job_type: The type of job (e.g., TR_USER, TR_DARE)
130 validated: TriggerData containing PersistedWrapper[TModel] from the trigger event
132 """
133 job_name = validated.job_type
134 valid_result = validated.valid()
135 if valid_result.is_error:
136 msg = f"Trigger validation failed for {job_name}: {valid_result.error}"
137 self.validator_error(message=msg, validator=validated)
138 return
140 # NOTE: if we arn't processing jobs, we still add it
141 # NOTE: and process when we are back online ..
142 # if not self.runtime_config.is_job_enabled(job_type):
144 doc_id = validated.doc_id
145 assert doc_id is not None # narrowing
147 debug_str = f"[{job_name}, doc_id={doc_id}, job_type={job_type.value}]"
148 # NOTE: we still need to add the job, even if the job is running
149 # NOTE: and process in the next cycle..
150 # if self.runtime_config.is_job_running(job_type):
152 try:
153 if isinstance(validated, UpdateTriggerData):
154 self._handle_update(doc_id, job_type, validated)
155 # elif isinstance(validator, TriggerData):
156 else:
157 # creation trigger, therefore no need for version check
158 self.info(f"Created trigger job ({debug_str}).")
159 model = validated.wrapper
160 assert model # narrowing, validated above
161 self.job_logger.create_new(
162 doc_id,
163 job_type,
164 model=model.to_dict(),
165 )
167 except Exception as e:
168 # these could be superflous, so we log, but dont notify admin
169 msg = f"Error creating trigger job for {debug_str}: {e}"
170 self.job_error(
171 job_type=job_type,
172 error_code=AppErrorCode.TRIGGER,
173 doc_id=doc_id,
174 message=msg,
175 error=e,
176 notify_admin=False,
177 )
179 def _handle_update(
180 self,
181 doc_id: str,
182 job_type: AppJobType,
183 validated: UpdateTriggerData[Any],
184 ) -> None:
185 job_name = validated.job_type
187 debug_str = f"[{job_name}, doc_id={doc_id}, job_type={job_type.value}]"
189 before = validated.before_wrapper
190 after = validated.wrapper
192 assert before # narrowing, validated above
193 assert after # narrowing, validated above
195 if not after.version_changed(before):
196 if IS_DEBUG:
197 msg = f"No version change detected. Skipping. ({debug_str})."
198 LOG().debug(msg)
199 return
201 score = before.calculate_change_score(after)
202 threshold = self.app_config.change_score_threshold
203 if IS_DEBUG:
204 msg = f"Calculated change score: {score:.4f} (threshold: {threshold}) for {debug_str}"
205 LOG().debug(msg)
207 if score < threshold:
208 if IS_DEBUG:
209 msg = f"Insignificant change. Skipping ({debug_str})."
210 LOG().debug(msg)
211 return
213 if IS_DEBUG:
214 msg = (
215 f"Update Trigger: {debug_str} - Change score={score:.4f} Threshold={threshold}\n"
216 f"-- BEFORE -- \n"
217 f"{before.to_dict()}\n"
218 f"-- AFTER -- \n"
219 f"{after.to_dict()}"
220 )
221 LOG().debug(msg)
223 self.job_logger.create_update(
224 doc_id,
225 job_type,
226 before_model=before.to_dict(),
227 after_model=after.to_dict(),
228 updated_data=validated.updates(),
229 )
231 def validator_error(
232 self,
233 message: str,
234 validator: CronValidator | TriggerData[Any, Any],
235 ) -> None: # pragma: no cover
236 LOG().error(message)
237 job_type = validator.job_type
238 self.app_logger.validation_error(
239 job_type=job_type,
240 error_code=AppErrorCode.VALIDATION,
241 validator=validator,
242 message=message,
243 )
245 def info(self, message: str, data: DatabaseDict | None = None) -> None:
246 LOG().info(message)
247 self.app_logger.info(message=message, data=data)