Coverage for functions \ flipdare \ service \ core \ cron_processor.py: 84%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13""" 

14Generic cron processor to eliminate code duplication across admin cron methods. 

15""" 

16 

17from __future__ import annotations 

18 

19from collections.abc import Callable 

20from typing import TYPE_CHECKING, Any 

21from collections.abc import Sequence 

22from dataclasses import dataclass 

23from flipdare.app_log import LOG 

24from flipdare.app_types import CronResult 

25from flipdare.constants import NO_DOC_ID 

26from flipdare.result.app_result import AppResult 

27from flipdare.core.cron_decorator import cron_decorator 

28from flipdare.result.output_result import OutputResult 

29from flipdare.result.job_result import JobResult 

30from flipdare.generated.model.backend.metric.count_metric import CountMetric 

31from flipdare.generated.shared.app_error_code import AppErrorCode 

32from flipdare.generated.shared.backend.app_job_type import AppJobType 

33from flipdare.result.outcome import Outcome 

34from flipdare.util.time_util import TimeUtil 

35from flipdare.wrapper import PersistedWrapper 

36 

37if TYPE_CHECKING: 

38 from flipdare.backend.app_logger import AppLogger 

39 

40__all__ = ["CronProcessor", "CronConfig"] 

41 

42 

43@dataclass(frozen=True) 

44class CronResultEntry: 

45 doc_id: str 

46 outcome: Outcome 

47 message: str 

48 

49 

50class CronConfig[T: PersistedWrapper[Any]]: 

51 """Configuration for a generic cron processor.""" 

52 

53 def __init__( 

54 self, 

55 job_type: AppJobType, 

56 job_name: str, 

57 query_fn: Callable[[], list[T]], 

58 process_fn: Callable[[T], Outcome | AppResult[Any] | JobResult[Any]], 

59 error_type: AppErrorCode = AppErrorCode.DATABASE_EX, 

60 report_fn: ( 

61 Callable[[AppJobType, Sequence[CronResultEntry]], OutputResult | None] | None 

62 ) = None, 

63 skip_empty_check: bool = False, 

64 ) -> None: 

65 """ 

66 Initialize cron configuration. 

67 

68 Args: 

69 job_type: The AppJobType for the perf decorator 

70 job_name: Human-readable name for logging (e.g., "cron_invite_unprocessed") 

71 query_fn: Function that returns list of items to process 

72 process_fn: Function that processes each item and returns ResultValue or AppResult 

73 error_type: Default error type for exceptions 

74 report_fn: Optional function to call with (job_type, ok_ids, error_ids) for reporting 

75 skip_empty_check: If True, don't return early when query returns empty list 

76 

77 """ 

78 self.job_type = job_type 

79 self.job_name = job_name 

80 self.query_fn = query_fn 

81 self.process_fn = process_fn 

82 self.error_type = error_type 

83 self.report_fn = report_fn 

84 self.skip_empty_check = skip_empty_check 

85 

86 

87class CronProcessor[T: PersistedWrapper[Any]]: 

88 """ 

89 Generic processor for cron jobs that follow the pattern: 

90 1. Query for items 

91 2. Process each item 

92 3. Track success/failure/skip counts 

93 4. Handle exceptions 

94 5. Return performance results 

95 

96 Example usage: 

97 config = CronConfig( 

98 job_type=AppJobType.CR_INVITE_UNPROCESSED, 

99 job_name="cron_invite_unprocessed", 

100 query_fn=lambda: self.invite_db.get_unprocessed_invites_last_week(), 

101 process_fn=lambda invite: self._invite_processor.process_invite_signup(invite) 

102 ) 

103 return CronProcessor.process(config) 

104 """ 

105 

106 def __init__( 

107 self, 

108 config: CronConfig[T], 

109 log_creator: AppLogger | None = None, 

110 ) -> None: 

111 self._log_creator = log_creator 

112 self._config = config 

113 

114 @property 

115 def log_creator(self) -> AppLogger: 

116 from flipdare.services import get_app_logger 

117 

118 if self._log_creator is None: 

119 self._log_creator = get_app_logger() 

120 return self._log_creator 

121 

122 def process_result(self) -> CronResult: 

123 """ 

124 Execute a generic cron processing job. 

125 

126 Args: 

127 config: CronConfig containing all necessary functions and metadata 

128 

129 Returns: 

130 CronResult from the stat decorator (which wraps CronResult) 

131 

132 """ 

133 config = self._config 

134 

135 # Apply the perf decorator dynamically 

136 @cron_decorator(job_type=config.job_type) 

137 def _execute() -> CronResult: 

138 return self._process_result_items(config) 

139 

140 return _execute() 

141 

142 def _process_result_items( # noqa: PLR0912, PLR0915 

143 self, 

144 config: CronConfig[T], 

145 ) -> CronResult: 

146 """Internal implementation of the cron processing logic.""" 

147 main_result = AppResult(task_name=config.job_name) 

148 passed_ct = 0 

149 failed_ct = 0 

150 skipped_ct = 0 

151 processing_error = False 

152 processed: list[CronResultEntry] = [] 

153 start = TimeUtil.get_current_utc_dt() 

154 

155 try: 

156 items = config.query_fn() 

157 # Check for empty results unless skip_empty_check is True 

158 if not config.skip_empty_check and not items: 

159 msg = f"No items found for {config.job_name} processing." 

160 LOG().info(msg) 

161 return CountMetric.empty() 

162 

163 item_type = config.job_name.replace("cron_", "").replace("_unprocessed", "") 

164 msg = f"Found {len(items)} {item_type}(s) for {config.job_name} processing." 

165 LOG().info(msg) 

166 

167 for item in items: 

168 try: 

169 result = config.process_fn(item) 

170 doc_id = item.doc_id or NO_DOC_ID 

171 

172 if result.is_ok: 

173 passed_ct += 1 

174 processed.append( 

175 CronResultEntry(doc_id, Outcome.OK, f"Processed {item_type} {doc_id}") 

176 ) 

177 continue 

178 if result.is_skipped: 

179 skipped_ct += 1 

180 continue 

181 

182 # Handles, Outcome, AppResult and OutputAppResult 

183 error: str | None = None 

184 match result: 

185 case AppResult(): 

186 match result.outcome: 

187 case Outcome.ERROR: 

188 error = f"Error processing {item_type} {doc_id}" 

189 main_result.merge(result) 

190 case Outcome.SKIPPED | Outcome.WARNING: 

191 skipped_ct += 1 

192 case Outcome.OK: 

193 processed.append( 

194 CronResultEntry( 

195 doc_id, Outcome.OK, f"Processed {item_type} {doc_id}" 

196 ) 

197 ) 

198 passed_ct += 1 

199 case JobResult(): 

200 match result.app_result.outcome: 

201 case Outcome.ERROR: 

202 error = f"Error processing {item_type} {doc_id}" 

203 main_result.merge(result.app_result) 

204 case Outcome.SKIPPED | Outcome.WARNING: 

205 skipped_ct += 1 

206 case Outcome.OK: 

207 processed.append( 

208 CronResultEntry( 

209 doc_id, Outcome.OK, f"Processed {item_type} {doc_id}" 

210 ) 

211 ) 

212 passed_ct += 1 

213 case Outcome(): 

214 match result: 

215 case Outcome.ERROR: 

216 error = f"Error processing {item_type} {doc_id}" 

217 main_result.add_error(config.error_type, error) 

218 case Outcome.SKIPPED | Outcome.WARNING: 

219 skipped_ct += 1 

220 case Outcome.OK: 

221 processed.append( 

222 CronResultEntry( 

223 doc_id, Outcome.OK, f"Processed {item_type} {doc_id}" 

224 ) 

225 ) 

226 passed_ct += 1 

227 

228 if error is not None: 

229 failed_ct += 1 

230 LOG().error(error) 

231 processed.append(CronResultEntry(doc_id, Outcome.ERROR, error)) 

232 

233 except Exception as item_ex: 

234 doc_id = getattr(item, "doc_id", NO_DOC_ID) or NO_DOC_ID 

235 msg = f"Exception processing {item_type} {doc_id}: {item_ex}" 

236 LOG().error(msg) 

237 main_result.add_error(config.error_type, msg) 

238 processed.append(CronResultEntry(doc_id, Outcome.ERROR, msg)) 

239 failed_ct += 1 

240 processing_error = True 

241 

242 except Exception as e: 

243 msg = f"Exception during {config.job_name} processing: {e}" 

244 LOG().error(msg) 

245 main_result.add_error(config.error_type, msg) 

246 processing_error = True 

247 

248 # Call report function if provided 

249 if config.report_fn is not None: 

250 try: 

251 config.report_fn(config.job_type, processed) 

252 except Exception as report_ex: 

253 LOG().warning(f"Failed to generate report for {config.job_name}: {report_ex}") 

254 

255 end = TimeUtil.get_current_utc_dt() 

256 duration = TimeUtil.duration_in_seconds(start, end) 

257 

258 msg = ( 

259 f"{config.job_name} completed: {passed_ct} passed, " 

260 f"{failed_ct} failed, {skipped_ct} skipped." 

261 ) 

262 LOG().info(msg) 

263 

264 if not processing_error and not main_result.is_error: 

265 LOG().info(f"Cron job {config.job_name} completed successfully.") 

266 # we have a least one result so return counts 

267 return CountMetric( 

268 success_ct=passed_ct, 

269 failed_ct=failed_ct, 

270 skipped_ct=skipped_ct, 

271 duration=duration, 

272 ) 

273 

274 # error occurred during processing items 

275 LOG().error(f"Cron job {config.job_name} completed with errors\n{main_result.formatted}") 

276 self.log_creator.system_error( 

277 job_type=config.job_type, 

278 message=f"Cron job {config.job_name} encountered errors.", 

279 error_code=config.error_type, 

280 result=main_result, 

281 ) 

282 return JobResult.from_result( 

283 main_result, 

284 duration=duration, 

285 data={ 

286 "success_ct": passed_ct, 

287 "failed_ct": failed_ct, 

288 "skipped_ct": skipped_ct, 

289 }, 

290 )