Coverage for functions \ flipdare \ service \ core \ step_processor.py: 81%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13""" 

14Generic step-based processing with automatic state tracking and resume capability. 

15Works with PersistedWrapper instances that have processing state fields. 

16""" 

17 

18from collections.abc import Callable 

19from dataclasses import dataclass 

20from enum import Enum 

21from typing import Any, Final 

22 

23from flipdare.app_log import LOG 

24from flipdare.constants import IS_DEBUG 

25from flipdare.result.app_result import AppResult 

26from flipdare.generated.shared.app_error_code import AppErrorCode 

27from flipdare.wrapper import PersistedWrapper 

28 

29_PROCESSED_KEY: Final = "processed" 

30_ERROR_CT_KEY: Final = "error_count" 

31 

32REQUIRED_FIELDS: Final = [_PROCESSED_KEY, _ERROR_CT_KEY, "_model"] 

33 

34 

35@dataclass 

36class ProcessingStep[K: Enum, T: PersistedWrapper[Any]]: 

37 """ 

38 Defines a single processing step with its state key and handler. 

39 

40 Type Parameters: 

41 K: String literal type for the state key field name 

42 T: The specific PersistedWrapper type (e.g., ContentWrapper) 

43 """ 

44 

45 state_key: K 

46 handler: Callable[[T], AppResult[Any]] # Handler receives the specific wrapper type 

47 description: str = "" 

48 required: bool = True 

49 

50 

51class StepProcessor[T: PersistedWrapper[Any]]: 

52 """ 

53 Generic processor for multi-step tasks with automatic state tracking. 

54 

55 Features: 

56 - Automatically skips completed steps 

57 - Saves state after each successful step 

58 - Returns detailed AppResult for error handling 

59 - Resumes from last successful checkpoint on retry 

60 

61 Type Parameters: 

62 T: PersistedWrapper[Any] - The wrapper type being processed 

63 

64 Usage: 

65 processor = StepProcessor( 

66 wrapper=invite_model, 

67 steps=[ 

68 ProcessingStep('user_created', lambda m: self._create_user(m), 'Create user'), 

69 ProcessingStep('email_sent', lambda m: self._send_email(m), 'Send email'), 

70 ], 

71 save_handler=lambda m: self.invite_bridge.update(m), 

72 process_name='process_invite' 

73 ) 

74 result = processor.execute() 

75 """ 

76 

77 def __init__( 

78 self, 

79 wrapper: T, 

80 steps: list[ProcessingStep[Any, T]], 

81 save_handler: Callable[[T], AppResult[T]], 

82 process_name: str = "step_processor", 

83 save_after_each_step: bool = True, 

84 mark_complete_when_done: bool = True, 

85 ) -> None: 

86 self.wrapper = wrapper 

87 self.steps = steps 

88 self.save_handler = save_handler 

89 self.process_name = process_name 

90 self.save_after_each_step = save_after_each_step 

91 self.mark_complete = mark_complete_when_done 

92 doc_id = wrapper.doc_id 

93 self.result = AppResult[T](doc_id=doc_id, task_name=process_name) 

94 

95 # Verify wrapper has required state fields 

96 name = type(wrapper).__name__ 

97 for field in REQUIRED_FIELDS: 

98 if not hasattr(wrapper, field): 

99 raise ValueError(f"Wrapper {name} must have '{field}' field") 

100 

101 def execute(self) -> AppResult[T]: # noqa: PLR0912 

102 """ 

103 Execute all steps in sequence, skipping completed ones. 

104 Automatically saves state and handles errors. 

105 

106 Returns: 

107 AppResult with success/error status and the final PersistedWrapper model 

108 

109 """ 

110 if self._is_processing_complete(): 

111 msg = f"{self.process_name}: Already complete, skipping" 

112 if IS_DEBUG: 

113 LOG().debug(msg) 

114 

115 self.result.set_skipped(msg) 

116 # T is already bound to PersistedWrapper[Any], so just assign directly 

117 self.result.generated = self.wrapper 

118 return self.result 

119 

120 for step in self.steps: 

121 # Check if step already completed 

122 if self._is_step_complete(step): 

123 if IS_DEBUG: 

124 msg = ( 

125 f"{self.process_name}: Step '{step.state_key}' already complete, skipping" 

126 ) 

127 LOG().debug(msg) 

128 continue 

129 

130 # Execute step 

131 if IS_DEBUG: 

132 msg = ( 

133 f"{self.process_name}: Executing step '{step.state_key}' - {step.description}" 

134 ) 

135 LOG().debug(msg) 

136 

137 step_result = step.handler(self.wrapper) # Pass current wrapper to handler 

138 

139 # Handle step result 

140 if step_result.is_error: 

141 self._increment_error_count() 

142 self.result.merge(step_result) 

143 if step.required: 

144 # Required step failed, save progress and stop 

145 self._save_progress() 

146 return self.result 

147 # Optional step failed, log and continue 

148 msg = f"{self.process_name}: Optional step '{step.state_key}' failed, continuing" 

149 LOG().warning(msg) 

150 continue 

151 

152 # Mark step complete 

153 self._mark_step_complete(step.state_key, success=True) 

154 

155 # Save progress after each step 

156 if self.save_after_each_step: 

157 save_result = self._save_progress() 

158 if save_result.is_error: 

159 self.result.merge(save_result) 

160 return self.result 

161 

162 # All steps completed successfully 

163 if self.mark_complete and not self._is_processing_complete(): 

164 self._mark_processed() 

165 final_save = self._save_progress() 

166 if final_save.is_error: 

167 self.result.merge(final_save) 

168 return self.result 

169 

170 # T is already bound to PersistedWrapper[Any], so just assign directly 

171 self.result.generated = self.wrapper 

172 

173 if IS_DEBUG: 

174 LOG().debug(f"{self.process_name}: All steps completed successfully") 

175 

176 return self.result 

177 

178 def _is_processing_complete(self) -> bool: 

179 """Check if processing is complete by checking the processed field.""" 

180 return getattr(self.wrapper, _PROCESSED_KEY, False) 

181 

182 def _mark_processed(self) -> None: 

183 """Mark processing as complete.""" 

184 self.wrapper.update_field(_PROCESSED_KEY, True) 

185 

186 def _increment_error_count(self) -> None: 

187 """Increment the error count.""" 

188 current = getattr(self.wrapper, _ERROR_CT_KEY, 0) 

189 self.wrapper.update_field(_ERROR_CT_KEY, current + 1) 

190 

191 def _is_step_complete(self, step: ProcessingStep[Any, Any]) -> bool: 

192 """Check if a step has already been completed.""" 

193 key = step.state_key.value 

194 try: 

195 return getattr(self.wrapper, key, False) 

196 except AttributeError: 

197 msg = f"{self.process_name}: State key '{key}' not found on wrapper" 

198 LOG().warning(msg) 

199 return False 

200 

201 def _mark_step_complete(self, state_key: Enum, success: bool = True) -> None: 

202 """Mark a step as complete in the wrapper's state.""" 

203 try: 

204 self.wrapper.update_field(state_key.value, success) 

205 except Exception as e: 

206 LOG().error(f"{self.process_name}: Failed to mark step complete: {e}") 

207 

208 def _save_progress(self) -> AppResult[T]: 

209 """Persist current state using the save handler.""" 

210 try: 

211 save_result = self.save_handler(self.wrapper) 

212 if save_result.is_error: 

213 msg = f"{self.process_name}: Failed to save progress\n{save_result.formatted}" 

214 LOG().error(msg) 

215 # Update wrapper with saved version if available 

216 elif save_result.generated: 

217 # save_result.generated is already PersistedWrapper[T], use it directly 

218 self.wrapper = save_result.generated 

219 return save_result 

220 except Exception as e: 

221 error_result = AppResult[T](task_name=f"{self.process_name}_save") 

222 error_result.add_error( 

223 error_code=AppErrorCode.DATABASE_EX, 

224 message=f"Exception saving progress: {e}", 

225 ) 

226 return error_result 

227 

228 

229class SimpleStepProcessor[T: PersistedWrapper[Any]](StepProcessor[T]): 

230 """ 

231 Simplified processor that doesn't auto-save after each step. 

232 Use when you want manual control over when state is persisted. 

233 """ 

234 

235 def __init__( 

236 self, 

237 model: T, 

238 steps: list[ProcessingStep[Any, T]], 

239 save_handler: Callable[[T], AppResult[T]], 

240 process_name: str = "simple_step_processor", 

241 ) -> None: 

242 super().__init__( 

243 wrapper=model, 

244 steps=steps, 

245 save_handler=save_handler, 

246 process_name=process_name, 

247 save_after_each_step=False, 

248 mark_complete_when_done=False, 

249 ) 

250 

251 def save(self) -> AppResult[T]: 

252 """Manually save progress when needed.""" 

253 return self._save_progress()