Coverage for functions \ flipdare \ service \ core \ step_processor.py: 81%
113 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13"""
14Generic step-based processing with automatic state tracking and resume capability.
15Works with PersistedWrapper instances that have processing state fields.
16"""
18from collections.abc import Callable
19from dataclasses import dataclass
20from enum import Enum
21from typing import Any, Final
23from flipdare.app_log import LOG
24from flipdare.constants import IS_DEBUG
25from flipdare.result.app_result import AppResult
26from flipdare.generated.shared.app_error_code import AppErrorCode
27from flipdare.wrapper import PersistedWrapper
29_PROCESSED_KEY: Final = "processed"
30_ERROR_CT_KEY: Final = "error_count"
32REQUIRED_FIELDS: Final = [_PROCESSED_KEY, _ERROR_CT_KEY, "_model"]
35@dataclass
36class ProcessingStep[K: Enum, T: PersistedWrapper[Any]]:
37 """
38 Defines a single processing step with its state key and handler.
40 Type Parameters:
41 K: String literal type for the state key field name
42 T: The specific PersistedWrapper type (e.g., ContentWrapper)
43 """
45 state_key: K
46 handler: Callable[[T], AppResult[Any]] # Handler receives the specific wrapper type
47 description: str = ""
48 required: bool = True
51class StepProcessor[T: PersistedWrapper[Any]]:
52 """
53 Generic processor for multi-step tasks with automatic state tracking.
55 Features:
56 - Automatically skips completed steps
57 - Saves state after each successful step
58 - Returns detailed AppResult for error handling
59 - Resumes from last successful checkpoint on retry
61 Type Parameters:
62 T: PersistedWrapper[Any] - The wrapper type being processed
64 Usage:
65 processor = StepProcessor(
66 wrapper=invite_model,
67 steps=[
68 ProcessingStep('user_created', lambda m: self._create_user(m), 'Create user'),
69 ProcessingStep('email_sent', lambda m: self._send_email(m), 'Send email'),
70 ],
71 save_handler=lambda m: self.invite_bridge.update(m),
72 process_name='process_invite'
73 )
74 result = processor.execute()
75 """
77 def __init__(
78 self,
79 wrapper: T,
80 steps: list[ProcessingStep[Any, T]],
81 save_handler: Callable[[T], AppResult[T]],
82 process_name: str = "step_processor",
83 save_after_each_step: bool = True,
84 mark_complete_when_done: bool = True,
85 ) -> None:
86 self.wrapper = wrapper
87 self.steps = steps
88 self.save_handler = save_handler
89 self.process_name = process_name
90 self.save_after_each_step = save_after_each_step
91 self.mark_complete = mark_complete_when_done
92 doc_id = wrapper.doc_id
93 self.result = AppResult[T](doc_id=doc_id, task_name=process_name)
95 # Verify wrapper has required state fields
96 name = type(wrapper).__name__
97 for field in REQUIRED_FIELDS:
98 if not hasattr(wrapper, field):
99 raise ValueError(f"Wrapper {name} must have '{field}' field")
101 def execute(self) -> AppResult[T]: # noqa: PLR0912
102 """
103 Execute all steps in sequence, skipping completed ones.
104 Automatically saves state and handles errors.
106 Returns:
107 AppResult with success/error status and the final PersistedWrapper model
109 """
110 if self._is_processing_complete():
111 msg = f"{self.process_name}: Already complete, skipping"
112 if IS_DEBUG:
113 LOG().debug(msg)
115 self.result.set_skipped(msg)
116 # T is already bound to PersistedWrapper[Any], so just assign directly
117 self.result.generated = self.wrapper
118 return self.result
120 for step in self.steps:
121 # Check if step already completed
122 if self._is_step_complete(step):
123 if IS_DEBUG:
124 msg = (
125 f"{self.process_name}: Step '{step.state_key}' already complete, skipping"
126 )
127 LOG().debug(msg)
128 continue
130 # Execute step
131 if IS_DEBUG:
132 msg = (
133 f"{self.process_name}: Executing step '{step.state_key}' - {step.description}"
134 )
135 LOG().debug(msg)
137 step_result = step.handler(self.wrapper) # Pass current wrapper to handler
139 # Handle step result
140 if step_result.is_error:
141 self._increment_error_count()
142 self.result.merge(step_result)
143 if step.required:
144 # Required step failed, save progress and stop
145 self._save_progress()
146 return self.result
147 # Optional step failed, log and continue
148 msg = f"{self.process_name}: Optional step '{step.state_key}' failed, continuing"
149 LOG().warning(msg)
150 continue
152 # Mark step complete
153 self._mark_step_complete(step.state_key, success=True)
155 # Save progress after each step
156 if self.save_after_each_step:
157 save_result = self._save_progress()
158 if save_result.is_error:
159 self.result.merge(save_result)
160 return self.result
162 # All steps completed successfully
163 if self.mark_complete and not self._is_processing_complete():
164 self._mark_processed()
165 final_save = self._save_progress()
166 if final_save.is_error:
167 self.result.merge(final_save)
168 return self.result
170 # T is already bound to PersistedWrapper[Any], so just assign directly
171 self.result.generated = self.wrapper
173 if IS_DEBUG:
174 LOG().debug(f"{self.process_name}: All steps completed successfully")
176 return self.result
178 def _is_processing_complete(self) -> bool:
179 """Check if processing is complete by checking the processed field."""
180 return getattr(self.wrapper, _PROCESSED_KEY, False)
182 def _mark_processed(self) -> None:
183 """Mark processing as complete."""
184 self.wrapper.update_field(_PROCESSED_KEY, True)
186 def _increment_error_count(self) -> None:
187 """Increment the error count."""
188 current = getattr(self.wrapper, _ERROR_CT_KEY, 0)
189 self.wrapper.update_field(_ERROR_CT_KEY, current + 1)
191 def _is_step_complete(self, step: ProcessingStep[Any, Any]) -> bool:
192 """Check if a step has already been completed."""
193 key = step.state_key.value
194 try:
195 return getattr(self.wrapper, key, False)
196 except AttributeError:
197 msg = f"{self.process_name}: State key '{key}' not found on wrapper"
198 LOG().warning(msg)
199 return False
201 def _mark_step_complete(self, state_key: Enum, success: bool = True) -> None:
202 """Mark a step as complete in the wrapper's state."""
203 try:
204 self.wrapper.update_field(state_key.value, success)
205 except Exception as e:
206 LOG().error(f"{self.process_name}: Failed to mark step complete: {e}")
208 def _save_progress(self) -> AppResult[T]:
209 """Persist current state using the save handler."""
210 try:
211 save_result = self.save_handler(self.wrapper)
212 if save_result.is_error:
213 msg = f"{self.process_name}: Failed to save progress\n{save_result.formatted}"
214 LOG().error(msg)
215 # Update wrapper with saved version if available
216 elif save_result.generated:
217 # save_result.generated is already PersistedWrapper[T], use it directly
218 self.wrapper = save_result.generated
219 return save_result
220 except Exception as e:
221 error_result = AppResult[T](task_name=f"{self.process_name}_save")
222 error_result.add_error(
223 error_code=AppErrorCode.DATABASE_EX,
224 message=f"Exception saving progress: {e}",
225 )
226 return error_result
229class SimpleStepProcessor[T: PersistedWrapper[Any]](StepProcessor[T]):
230 """
231 Simplified processor that doesn't auto-save after each step.
232 Use when you want manual control over when state is persisted.
233 """
235 def __init__(
236 self,
237 model: T,
238 steps: list[ProcessingStep[Any, T]],
239 save_handler: Callable[[T], AppResult[T]],
240 process_name: str = "simple_step_processor",
241 ) -> None:
242 super().__init__(
243 wrapper=model,
244 steps=steps,
245 save_handler=save_handler,
246 process_name=process_name,
247 save_after_each_step=False,
248 mark_complete_when_done=False,
249 )
251 def save(self) -> AppResult[T]:
252 """Manually save progress when needed."""
253 return self._save_progress()