Coverage for functions \ flipdare \ backend \ runtime_config_admin.py: 97%
176 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from typing import Any, override
16from abc import ABC, abstractmethod
17from datetime import datetime
18from flipdare.app_log import LOG
19from flipdare.constants import IS_DEBUG, RUNTIME_DB_REFRESH_INTERVAL
20from flipdare.core.singleton import Singleton
21from flipdare.firestore.backend.run_config_group_db import RunConfigGroupDb
22from flipdare.firestore.backend.run_config_job_db import RunConfigJobDb
23from flipdare.generated.shared.backend.app_job_group import AppJobGroup
24from flipdare.generated.shared.backend.app_job_type import AppJobType
25from flipdare.job.job_config import JobConfig
26from flipdare.util.time_util import TimeUtil
28__all__ = ["RuntimeConfigAdmin"]
31class RuntimeConfigAdmin(Singleton):
33 def __init__(
34 self,
35 run_group_db: RunConfigGroupDb | None = None,
36 run_job_db: RunConfigJobDb | None = None,
37 job_config: JobConfig | None = None,
38 default_value: bool = True,
39 refresh_interval_seconds: int = RUNTIME_DB_REFRESH_INTERVAL,
40 ) -> None:
42 from flipdare.services import get_run_group_db, get_run_job_db
43 from flipdare.app_config import get_job_config
45 LOG().info("Initializing RuntimeConfigAdmin...")
46 self._refresh_interval = refresh_interval_seconds
48 if job_config is None:
49 job_config = get_job_config()
51 if run_group_db is None:
52 run_group_db = get_run_group_db()
54 self._run_group = _RuntimeGroup(
55 job_config=job_config,
56 db=run_group_db,
57 default_value=default_value,
58 refresh_interval=refresh_interval_seconds,
59 )
61 if run_job_db is None:
62 run_job_db = get_run_job_db()
64 self._run_job = _RuntimeJob(
65 job_config=job_config,
66 db=run_job_db,
67 default_value=default_value,
68 refresh_interval=refresh_interval_seconds,
69 )
71 LOG().info(
72 f"RuntimeConfigAdmin initialized with default_value={default_value}, "
73 f"refresh_interval_seconds={refresh_interval_seconds}",
74 )
76 @property
77 def last_group_refresh(self) -> datetime | None:
78 return self._run_group.last_refresh
80 @property
81 def last_job_refresh(self) -> datetime | None:
82 return self._run_job.last_refresh
84 @property
85 def refresh_interval(self) -> int:
86 return self._refresh_interval
88 # ---------------------------------------------------------------------------------------------
89 # GROUP CONFIG
90 # ---------------------------------------------------------------------------------------------
92 def disable_all(self) -> None:
93 self._run_group.disable_all()
94 LOG().info("Disabled all RuntimeConfig types.")
96 def enable_all(self) -> None:
97 self._run_group.enable_all()
98 LOG().info("Enabled all RuntimeConfig types.")
100 def enable_group(self, job_group: AppJobGroup) -> None:
101 self._run_group.enable(job_group)
103 def disable_group(self, job_group: AppJobGroup) -> None:
104 self._run_group.disable(job_group)
106 def is_group_enabled(self, job_group: AppJobGroup) -> bool:
107 """Return the cached enabled state for a job group (refreshes periodically)."""
108 return self._run_group.is_enabled(job_group)
110 def is_job_enabled(self, job_type: AppJobType) -> bool:
111 """Return whether the group that owns *job_type* is currently enabled."""
112 return self._run_group.is_job_enabled(job_type)
114 # ---------------------------------------------------------------------------------------------
115 # JOB CONFIG
116 # ---------------------------------------------------------------------------------------------
118 def start_job(self, job_type: AppJobType) -> None:
119 # we add an entry in the db to prevent other instances from starting the same job
120 self._run_job.start_job(job_type)
122 def is_job_running(self, job_type: AppJobType) -> datetime | None:
123 return self._run_job.is_job_running(job_type)
125 def cancel_job(self, job_type: AppJobType) -> None:
126 self._run_job.cancel_job(job_type)
129# ---------------------------------------------------------------------------------------------
130# INTERNAL CLASSES
131# ---------------------------------------------------------------------------------------------
134class _RuntimeMixin[T](ABC):
135 __slots__ = (
136 "_data",
137 "_job_config",
138 "_label",
139 "_last_refresh",
140 "_refresh_interval",
141 )
143 def __init__(
144 self,
145 label: str,
146 job_config: JobConfig,
147 refresh_interval: int,
148 last_refresh: datetime | None = None,
149 ) -> None:
150 self._label = label
151 self._refresh_interval = refresh_interval
152 self._last_refresh = last_refresh
153 self._job_config = job_config
154 self._data = dict[str, T]()
156 @abstractmethod
157 def _refresh(self) -> int: ...
159 def check_refresh_required(self) -> None:
160 """Check if we should refresh the config from the database, and refresh if so."""
161 # note: we have to check None here otherwise we get a recursion loop.
162 if not self._should_refresh:
163 if IS_DEBUG:
164 msg = (
165 f"No refresh needed for '{self._label}' "
166 f"(last refresh at {self._last_refresh}, interval={self._refresh_interval} sec)"
167 )
168 LOG().debug(msg)
169 return
171 changed_ct = self._refresh()
172 self._last_refresh = TimeUtil.get_current_utc_dt()
173 msg = (
174 f"Refreshed {self._label} (interval={self._refresh_interval} sec) "
175 f"at {self._last_refresh}, changed {changed_ct} items"
176 )
177 LOG().info(msg)
179 @property
180 def job_config(self) -> JobConfig:
181 return self._job_config
183 @property
184 def last_refresh(self) -> datetime | None:
185 return self._last_refresh
187 def set_refreshed(self) -> None:
188 self._last_refresh = TimeUtil.get_current_utc_dt()
190 @property
191 def _should_refresh(self) -> bool:
192 last_refresh = self._last_refresh
193 if last_refresh is None:
194 return True
196 current_time = TimeUtil.get_current_utc_dt()
197 elapsed = (current_time - last_refresh).total_seconds()
199 refresh_interval = self._refresh_interval
200 return elapsed >= refresh_interval
202 def set_data(self, key: str, value: T) -> None:
203 self._data[key] = value
205 def get_data(self, key: str, default_value: T) -> T:
206 self.check_refresh_required()
207 return self._data.get(key, default_value)
210class _RuntimeGroup(_RuntimeMixin[bool]):
211 def __init__(
212 self,
213 job_config: JobConfig,
214 db: RunConfigGroupDb,
215 default_value: bool,
216 refresh_interval: int,
217 ) -> None:
218 super().__init__(
219 label="RuntimeConfigGroup",
220 job_config=job_config,
221 refresh_interval=refresh_interval,
222 )
223 self.db = db
224 self.default_value = default_value
225 for job_group in AppJobGroup:
226 self.set_data(job_group, default_value)
228 @override
229 def _refresh(self) -> int:
230 if IS_DEBUG:
231 LOG().debug("Refreshing RuntimeConfigGroup from database...")
233 changed_ct = 0
234 for job_group in AppJobGroup:
235 cfg = self.db.get_group_config(job_group)
236 value = cfg.enabled if cfg is not None else self.default_value
237 # access data directy to avoid recursion in get_data which
238 # calls check_refresh_required which calls _refresh
239 old_value = self._data.get(job_group, self.default_value)
240 if old_value != value:
241 changed_ct += 1
242 self.set_data(job_group, value)
243 if IS_DEBUG:
244 msg = f"RuntimeConfigGroup change detected for '{job_group.value}': {old_value} -> {value}"
245 LOG().debug(msg)
247 return changed_ct
249 def disable_all(self) -> None:
250 for job_group in AppJobGroup:
251 self.disable(job_group)
253 self.set_refreshed()
254 LOG().info("Disabled all RuntimeConfigGroup types.")
256 def enable_all(self) -> None:
257 for job_group in AppJobGroup:
258 self.enable(job_group)
260 self.set_refreshed()
261 LOG().info("Enabled all RuntimeConfigGroup types.")
263 def is_enabled(self, job_group: AppJobGroup) -> bool:
264 return self.get_data(job_group, self.default_value)
266 def is_job_enabled(self, job_type: AppJobType) -> bool:
267 return self.is_enabled(job_type.job_group)
269 def enable(self, job_group: AppJobGroup) -> None:
270 try:
271 if IS_DEBUG:
272 LOG().debug(f"Enabling '{job_group.value}'...")
274 self.db.enable(job_group)
275 self.set_data(job_group, True)
276 except Exception as e:
277 LOG().error(f"Failed to enable '{job_group.value}': {e}")
278 self.set_data(job_group, self.default_value)
280 def disable(self, job_group: AppJobGroup) -> None:
281 try:
282 if IS_DEBUG:
283 LOG().debug(f"Disabling '{job_group.value}'...")
285 self.db.disable(job_group)
286 self.set_data(job_group, False)
287 except Exception as e:
288 LOG().error(f"Failed to disable '{job_group.value}': {e}")
289 self.set_data(job_group, self.default_value)
292class _RuntimeJob(_RuntimeMixin[Any]):
293 def __init__(
294 self,
295 job_config: JobConfig,
296 db: RunConfigJobDb,
297 default_value: bool,
298 refresh_interval: int,
299 ) -> None:
300 super().__init__(
301 label="RuntimeConfigJob",
302 job_config=job_config,
303 refresh_interval=refresh_interval,
304 )
305 self.db = db
306 self.default_value = default_value
308 @override
309 def _refresh(self) -> int:
310 # refreshing not required for job status, since we check in realtime
311 return 0
313 def start_job(self, job_type: AppJobType) -> None:
314 self.db.start_job(job_type=job_type)
316 def is_job_running(self, job_type: AppJobType) -> datetime | None:
317 cfg = self.db.get_job(job_type=job_type)
318 if cfg is None:
319 if IS_DEBUG:
320 msg = (
321 f"Failed to check if job '{job_type}' is running, no config found in database"
322 )
323 LOG().debug(msg)
325 return None
327 if IS_DEBUG:
328 msg = f"Checked job status for '{job_type.job_group.value}'-'{job_type.value}': started at {cfg.started_at}"
329 LOG().debug(msg)
331 return cfg.started_at
333 def cancel_job(self, job_type: AppJobType) -> None:
334 self.db.cancel_job(job_type=job_type)