Coverage for functions \ flipdare \ job \ job_config.py: 85%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13""" 

14Efficient job configuration loader using dataclasses and caching. 

15Loads job_config.yaml once and provides fast access to job metadata. 

16""" 

17 

18from collections.abc import Iterator 

19from dataclasses import dataclass 

20from functools import lru_cache 

21from pathlib import Path 

22from typing import IO, Any, Optional 

23 

24from flipdare.app_log import LOG 

25from flipdare.constants import JOB_CONFIG_PATH 

26from flipdare.generated.shared.backend.app_job_group import AppJobGroup 

27from flipdare.generated.shared.backend.app_job_type import AppJobType 

28from flipdare.job.app_job_schedule import AppJobSchedule 

29from flipdare.job.job_config_option import JobConfigOption 

30from flipdare.util.yaml_loader import YamlLoader 

31 

32__all__ = ["JobConfig"] 

33 

34 

35@dataclass(frozen=True) 

36class _SavedState: 

37 jobs: dict[str, JobConfigOption] 

38 triggers: dict[str, JobConfigOption] 

39 tasks: dict[str, JobConfigOption] 

40 by_interval: dict[AppJobSchedule, list[JobConfigOption]] 

41 by_job_group: dict[AppJobGroup, list[JobConfigOption]] 

42 

43 

44class JobConfig: 

45 """ 

46 Singleton loader for job configuration with efficient caching. 

47 

48 Features: 

49 - Loads YAML once and caches in memory 

50 - Uses dataclasses for minimal memory overhead 

51 - Supports both flat and hierarchical YAML formats 

52 - Fast lookups with dict-based indexing 

53 """ 

54 

55 _instance: Optional["JobConfig"] = None 

56 _config_path: Path 

57 

58 def __init__(self, config_path: Path = JOB_CONFIG_PATH) -> None: 

59 LOG().info(f"Loading job configuration from {config_path}...") 

60 

61 self._config_path = Path(config_path) 

62 self._jobs: dict[str, JobConfigOption] = {} 

63 self._triggers: dict[str, JobConfigOption] = {} 

64 self._tasks: dict[str, JobConfigOption] = {} 

65 self._by_interval: dict[AppJobSchedule, list[JobConfigOption]] = {} 

66 self._by_job_group: dict[AppJobGroup, list[JobConfigOption]] = {} 

67 

68 self._load_config() 

69 

70 @classmethod 

71 def instance(cls, config_path: Path = JOB_CONFIG_PATH) -> "JobConfig": 

72 """Get singleton instance""" 

73 if cls._instance is None: 

74 cls._instance = cls(config_path) 

75 return cls._instance 

76 

77 def try_reload(self, cfg: IO[str]) -> bool: 

78 """ 

79 Reload job configuration from a YAML stream. 

80 

81 Snapshots the current state before attempting the reload. On success the 

82 LRU caches for ``is_trigger`` / ``is_task`` are invalidated. On failure 

83 the previous state is fully restored. 

84 

85 Returns: 

86 True if the reload succeeded, False otherwise. 

87 

88 """ 

89 # Snapshot current state so we can restore on failure 

90 saved = _SavedState( 

91 self._jobs.copy(), 

92 self._triggers.copy(), 

93 self._tasks.copy(), 

94 {k: list(v) for k, v in self._by_interval.items()}, 

95 {k: list(v) for k, v in self._by_job_group.items()}, 

96 ) 

97 LOG().info("Attempting to reload JobConfig from provided YAML stream...") 

98 try: 

99 self._jobs.clear() 

100 self._triggers.clear() 

101 self._tasks.clear() 

102 self._by_interval.clear() 

103 self._by_job_group.clear() 

104 self._load_config(cfg=cfg) 

105 if not self._jobs: 

106 msg = "Reloaded config contains no job entries, cant reload." 

107 LOG().error(msg) 

108 self._reload_from_saved(saved) 

109 return False 

110 

111 # Invalidate stale caches 

112 self.is_trigger.cache_clear() 

113 self.is_task.cache_clear() 

114 LOG().info(f"JobConfigLoader reloaded: {len(self._jobs)} jobs.") 

115 except Exception as e: 

116 LOG().error(f"JobConfigLoader reload failed, restoring previous config: {e}") 

117 self._reload_from_saved(saved) 

118 return False 

119 else: 

120 # This runs ONLY if the try block succeeded 

121 LOG().info(f"JobConfigLoader reloaded: {len(self._jobs)} jobs.") 

122 return True 

123 

124 def _reload_from_saved(self, saved: _SavedState) -> None: 

125 self._jobs = saved.jobs 

126 self._triggers = saved.triggers 

127 self._tasks = saved.tasks 

128 self._by_interval = saved.by_interval 

129 self._by_job_group = saved.by_job_group 

130 

131 # === Query Methods === 

132 

133 def get(self, job_name: str) -> JobConfigOption | None: 

134 """Get job config by name""" 

135 return self._jobs.get(job_name) 

136 

137 def all(self) -> list[JobConfigOption]: 

138 """Get all job configs""" 

139 return list(self._jobs.values()) 

140 

141 def triggers(self) -> list[JobConfigOption]: 

142 """Get all trigger jobs""" 

143 return list(self._triggers.values()) 

144 

145 def tasks(self) -> list[JobConfigOption]: 

146 """Get all scheduled tasks""" 

147 return list(self._tasks.values()) 

148 

149 def by_interval(self, interval: AppJobSchedule) -> list[JobConfigOption]: 

150 """Get all jobs for a specific interval""" 

151 return self._by_interval.get(interval, []) 

152 

153 def by_job_group(self, app_job_group: AppJobGroup) -> list[JobConfigOption]: 

154 """Get all jobs for a specific AppJobGroup""" 

155 return self._by_job_group.get(app_job_group, []) 

156 

157 def intervals(self) -> list[AppJobSchedule]: 

158 """Get all unique intervals used in config""" 

159 return list(self._by_interval.keys()) 

160 

161 def job_groups(self) -> list[AppJobGroup]: 

162 """Get all unique job groups referenced in config""" 

163 return list(self._by_job_group.keys()) 

164 

165 # === Convenience Methods === 

166 

167 def get_by_job_type(self, app_job_type: AppJobType) -> JobConfigOption | None: 

168 """Get job config by AppJobType enum""" 

169 return self.get(app_job_type.value) 

170 

171 def get_by_job_group(self, app_job_group: AppJobGroup) -> list[JobConfigOption]: 

172 """Get all jobs belonging to a specific AppJobGroup.""" 

173 result = [] 

174 for job in self._jobs.values(): 

175 try: 

176 if job.job_group == app_job_group: 

177 result.append(job) 

178 except ValueError: 

179 pass # job has no valid group mapping 

180 return result 

181 

182 @lru_cache(maxsize=128) # noqa: B019 - global singleton cache, not per-instance 

183 def is_trigger(self, job_type: AppJobType) -> bool: 

184 """Check if a job is a trigger (cached)""" 

185 job_name = job_type.value 

186 job = self.get(job_name) 

187 return job.is_trigger if job else False 

188 

189 @lru_cache(maxsize=128) # noqa: B019 - global singleton cache, not per-instance 

190 def is_task(self, job_type: AppJobType) -> bool: 

191 """Check if a job is a task (cached)""" 

192 job_name = job_type.value 

193 job = self.get(job_name) 

194 return job.is_task if job else False 

195 

196 def validate_enums(self) -> list[str]: 

197 """ 

198 Validate that all jobs have valid enum mappings. 

199 Returns (missing_job_types, missing_runtime_config_types) 

200 """ 

201 jobs = self._jobs.values() 

202 

203 # 1. Collect missing types/groups from existing jobs 

204 missing_job_types = [j.name for j in jobs if j._job_type is None] 

205 

206 # 2. Extend with missing AppJobType values (Ruff PERF401 fix) 

207 # NOTE: we excluded scheduled jobs, because they have no config 

208 # NOTE: and are scheduled by firebase.. 

209 missing_job_types.extend( 

210 [jt.value for jt in AppJobType if not jt.is_scheduled and jt.value not in self._jobs] 

211 ) 

212 

213 return missing_job_types 

214 

215 def __len__(self) -> int: 

216 """Total number of jobs""" 

217 return len(self._jobs) 

218 

219 def __contains__(self, job_name: str) -> bool: 

220 """Check if job exists""" 

221 return job_name in self._jobs 

222 

223 def __iter__(self) -> Iterator[JobConfigOption]: 

224 """Iterate over all jobs""" 

225 return iter(self._jobs.values()) 

226 

227 # 

228 # internal 

229 # 

230 

231 def _load_config(self, cfg: IO[str] | None = None) -> None: 

232 """Load and parse YAML configuration""" 

233 source = cfg or Path(self._config_path) 

234 yaml_loader = YamlLoader(source=source) 

235 data = yaml_loader.load() 

236 

237 # Handle hierarchical format (triggers/tasks sections) 

238 if "triggers" in data or "tasks" in data: 

239 self._load_hierarchical(data) 

240 else: 

241 # Handle flat format (original structure) 

242 self._load_flat(data) 

243 

244 # Build indexes for fast lookups 

245 self._build_indexes() 

246 

247 def _create_job_config(self, name: str, config_dict: dict[str, Any]) -> JobConfigOption: 

248 # --- AppJobType (derived from the job name) --- 

249 try: 

250 app_job_type = AppJobType(name) 

251 except Exception as ex: 

252 valid = [e.value for e in AppJobType] 

253 msg = ( 

254 f"Job '{name}' has no AppJobType mapping. " 

255 f"Add '{name}' to AppJobType enum. Valid values: {valid}" 

256 ) 

257 raise ValueError(msg) from ex 

258 

259 # --- JobInterval (required YAML field 'schedule') --- 

260 schedule_str: str | None = config_dict.get("schedule") 

261 if not schedule_str: 

262 msg = f"Job '{name}' is missing required 'schedule' field in job_config.yaml." 

263 raise ValueError(msg) 

264 try: 

265 job_schedule = AppJobSchedule.from_string(schedule_str) 

266 except (ValueError, AttributeError) as ex: 

267 valid_intervals = [e.value for e in AppJobSchedule] 

268 msg = ( 

269 f"Job '{name}' has unknown schedule '{schedule_str}'. " 

270 f"Valid values: {valid_intervals}" 

271 ) 

272 raise ValueError(msg) from ex 

273 

274 # Strip loader-managed keys; remaining fields pass through to JobConfig 

275 config_copy = { 

276 k: v for k, v in config_dict.items() if k not in ("app_job_group", "schedule") 

277 } 

278 

279 return JobConfigOption( 

280 name=name, 

281 _job_type=app_job_type, 

282 _schedule=job_schedule, 

283 **config_copy, 

284 ) 

285 

286 def _load_hierarchical(self, data: dict[str, Any]) -> None: 

287 """Load hierarchical YAML format with triggers/crons sections""" 

288 # Load triggers 

289 for trigger_name, trigger_config in data.get("triggers", {}).items(): 

290 job = self._create_job_config(trigger_name, trigger_config) 

291 self._jobs[trigger_name] = job 

292 self._triggers[trigger_name] = job 

293 

294 # Load tasks 

295 for task_name, task_config in data.get("tasks", {}).items(): 

296 job = self._create_job_config(task_name, task_config) 

297 self._jobs[task_name] = job 

298 self._tasks[task_name] = job 

299 

300 def _load_flat(self, data: dict[str, Any]) -> None: 

301 """Load flat YAML format (one entry per job)""" 

302 for job_name, job_config in data.items(): 

303 if job_name.startswith("_"): # Skip template definitions 

304 continue 

305 

306 job = self._create_job_config(job_name, job_config) 

307 self._jobs[job_name] = job 

308 

309 if job.is_trigger: 

310 self._triggers[job_name] = job 

311 else: 

312 self._tasks[job_name] = job 

313 

314 def _build_indexes(self) -> None: 

315 """Build lookup indexes for efficient queries""" 

316 for job in self._jobs.values(): 

317 # Index by interval 

318 interval = job.schedule 

319 if interval not in self._by_interval: 

320 self._by_interval[interval] = [] 

321 self._by_interval[interval].append(job) 

322 

323 # Index by job group 

324 if job.job_group not in self._by_job_group: 

325 self._by_job_group[job.job_group] = [] 

326 self._by_job_group[job.job_group].append(job)