Cron processing refactor
The CronProcessor is a generic utility that eliminates code duplication across cron job methods in admin and safety controller classes. It provides a standardized approach to processing collections of database items with consistent error handling, logging, and performance tracking.
Before this refactoring, cron methods across the codebase followed a repetitive pattern:
@perf_decorator(job_type=AppJobType.CR_INVITE_UNPROCESSED)
def cron_invite_unprocessed(self) -> LogPerfResult:
main_result = AppResult('cron_invite_unprocessed')
passed_ct = 0
failed_ct = 0
skipped_ct = 0
try:
invites = self.invite_db.get_unprocessed_invites_last_week()
LOG().info(f"Found {len(invites)} unprocessed invites...")
for invite in invites:
value = self._invite_processor.process_invite_signup(invite)
if value.is_error:
doc_id = invite.doc_id or NO_DOC_ID
msg = f"Error processing invite {doc_id}"
main_result.add_error(AppErrorType.DATABASE_EX, msg)
failed_ct += 1
elif value.is_skipped:
skipped_ct += 1
else:
passed_ct += 1
except Exception as e:
msg = f"Exception during cron_invite processing: {e}"
LOG().error(msg)
main_result.add_error(AppErrorType.DATABASE_EX, msg)
LOG().info(f"cron_invite completed: {passed_ct} passed, {failed_ct} failed...")
if main_result.is_error:
return LogPerfResult(BinaryPerfResult.error()).error(main_result)
else:
return LogPerfResult(BinaryPerfResult(passed_ct=passed_ct, ...))
This pattern was repeated across:
friend_admin.py:cron_invite_unprocessed,cron_friend_unprocessedgroup_admin.py:cron_group,cron_group_member_unprocessed,cron_group_member_status_unprocesseduser_admin.py:cron_user_unprocessedrestriction_controller.py:cron_inactive_restrictions,cron_expired_restrictions
Issues:
- ~200 lines of duplicate boilerplate code
- Inconsistent error messages and logging
- Testing requires duplicating tests for each method
- Bug fixes/improvements need changes in multiple locations
- Hard to maintain consistency across all cron jobs
The CronProcessor class extracts the common pattern into a reusable utility:
config = CronConfig(
job_type=AppJobType.CR_INVITE_UNPROCESSED,
job_name="cron_invite_unprocessed",
query_fn=lambda: self.invite_db.get_unprocessed_invites_last_week(),
process_fn=lambda invite: self._invite_processor.process_invite_signup(invite)
)
return CronProcessor.process(config)
The CronConfig class provides flexibility:
CronConfig(
job_type: AppJobType, # For perf decorator and reporting
job_name: str, # For logging
query_fn: Callable[[], list[T]], # Fetches items to process
process_fn: Callable[[T], Union[ResultValue, AppResult]], # Processes each item
error_type: AppErrorType = AppErrorType.DATABASE_EX, # Default error type
report_fn: Optional[Callable] = None, # Optional reporting callback
skip_empty_check: bool = False # Skip early return on empty query
)
Before: ~40 lines per cron method × 8 methods = ~320 lines
After: ~8 lines per cron method × 8 methods = ~64 lines
Savings: ~256 lines (80% reduction)
All cron jobs now have:
- Standardized exception handling
- Consistent logging format
- Uniform performance tracking
- Predictable error reporting
Before: Each cron method required separate test suite:
- Test success cases
- Test failure cases
- Test skip cases
- Test exception handling
- Test counter tracking
- Test logging
After: Test CronProcessor once, then only test business logic:
# One comprehensive test suite for CronProcessor
test/unit/test_cron_processor.py
# Individual admin tests only need to verify:
# - Correct query function
# - Correct process function
# No need to test error handling, counting, logging, etc.
Changes to the cron pattern (e.g., adding metrics, improving logging) only require modifying one class.
Generic type parameter ensures type safety:
class CronConfig(Generic[T]): # T is bound to DatabaseModel
Before (40 lines):
@perf_decorator(job_type=AppJobType.CR_INVITE_UNPROCESSED)
def cron_invite_unprocessed(self) -> LogPerfResult:
main_result = AppResult('cron_invite_unprocessed')
passed_ct = 0
failed_ct = 0
skipped_ct = 0
# ... 30+ more lines
After (8 lines):
def cron_invite_unprocessed(self) -> LogPerfResult:
config = CronConfig(
job_type=AppJobType.CR_INVITE_UNPROCESSED,
job_name="cron_invite_unprocessed",
query_fn=lambda: self.invite_db.get_unprocessed_invites_last_week(),
process_fn=lambda invite: self._invite_processor.process_invite_signup(invite)
)
return CronProcessor.process(config)
Before (38 lines × 3 methods = 114 lines):
cron_groupcron_group_member_unprocessedcron_group_member_status_unprocessed
After (8 lines × 3 methods = 24 lines): All three methods use the same simple pattern with different configurations.
Before (36 lines with custom filtering):
users = self.user_db.get_recent_unprocessed()
for user in users:
if user.processing_complete:
skipped_ct += 1
continue
# ... process
After (8 lines with filtering in query):
config = CronConfig(
job_type=AppJobType.CR_USER_UNPROCESSED,
job_name="cron_user_unprocessed",
query_fn=lambda: [u for u in self.user_db.get_recent_unprocessed() if not u.processing_complete],
process_fn=lambda user: self.user_processor.process_user(user, is_update=False)
)
config = CronConfig(
# ...
error_type=AppErrorType.MODERATION # Override default
)
config = CronConfig(
# ...
report_fn=lambda job_type, ok_ids, error_ids:
self.system_report.run_report(job_type, ok_ids, error_ids)
)
The processor handles both ResultValue and AppResult returns:
def process_item(item):
result = AppResult("process")
# ... complex processing with detailed error tracking
return result # CronProcessor handles this automatically
- Test all success scenarios
- Test all failure scenarios
- Test all skip scenarios
- Test exception handling
- Test counter tracking
- Test reporting integration
- Test empty list handling
- Test query exceptions
Focus only on business logic:
def test_friend_admin_cron_invite():
"""Test that cron_invite_unprocessed uses correct query and processor."""
admin = FriendAdmin()
# Mock the database query
mock_invites = [Mock(), Mock()]
admin.invite_db.get_unprocessed_invites_last_week = Mock(return_value=mock_invites)
# Mock the processor
admin._invite_processor.process_invite_signup = Mock(return_value=ResultValue.OK)
# Execute
result = admin.cron_invite_unprocessed()
# Verify correct wiring (no need to test error handling, counting, etc.)
assert admin.invite_db.get_unprocessed_invites_last_week.called
assert admin._invite_processor.process_invite_signup.call_count == 2
Potential improvements to consider:
- Batch Processing: Add support for processing items in batches
- Progress Callbacks: Add progress reporting for long-running jobs
- Retry Logic: Add configurable retry for transient failures
- Rate Limiting: Add throttling for external API calls
- Parallel Processing: Add optional parallel processing for independent items
- Dry Run Mode: Add mode to simulate processing without side effects
The CronProcessor refactoring:
- Reduces code by ~80%
- Improves consistency across all cron jobs
- Simplifies testing dramatically
- Enhances maintainability
- Maintains flexibility for special cases
This is a significant improvement in code quality and maintainability without sacrificing any functionality.