Coverage for functions \ flipdare \ service \ flag_service.py: 26%
602 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from typing import TYPE_CHECKING, Any
16from flipdare.core.flag_code import FlagCode
17from flipdare.app_log import LOG
18from flipdare.app_types import DatabaseDict
19from flipdare.constants import (
20 ADMIN_DOC_ID,
21 IS_DEBUG,
22 RESTRICTION_MAX_WAIT_ADMIN_ACK_FLAG,
23 RESTRICTION_MAX_WAIT_USER_RESPONSE_FLAG,
24 RESTRICTION_REMINDER_USER_RESPONSE_FLAG,
25)
26from flipdare.result.app_result import AppResult
27from flipdare.core.cron_decorator import cron_decorator
28from flipdare.core.job_type_decorator import job_type_decorator
29from flipdare.result.job_result import JobResult
30from flipdare.core.trigger_decorator import trigger_decorator
31from flipdare.service._error_mixin import ErrorMixin
32from flipdare.service._email_mixin import EmailMixin
33from flipdare.service._service_provider import ServiceProvider
34from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
35from flipdare.mailer._jinja_email_template import JinjaEmailTemplate
36from flipdare.mailer.user.flag_email import FlagEmail
37from flipdare.mailer.user.flag_removed_email import FlagRemovedEmail
38from flipdare.firestore.context.flag_context import FlagContext
39from flipdare.generated import (
40 AppErrorCode,
41 AppJobType,
42 DisputedProgress,
43 FlagKeys,
44 FlagType,
45 IssueProgress,
46)
47from flipdare.generated.model.issue.issue_comment_model import IssueCommentModel
48from flipdare.generated.shared.firestore_collections import FirestoreCollections
49from flipdare.result.outcome import Outcome
50from flipdare.util import FirestoreTime, TimeUtil
51from flipdare.util.debug_util import stringify_debug
52from flipdare.wrapper import (
53 AppJobWrapper,
54 FlagWrapper,
55 RestrictionWrapper,
56 UserWrapper,
57)
58from flipdare.generated.model.backend.metric.count_metric import CountMetric
59from flipdare.app_types import CronResult
61if TYPE_CHECKING:
62 from flipdare.manager.search_manager import SearchManager
63 from flipdare.manager.service_manager import ServiceManager
64 from flipdare.manager.db_manager import DbManager
65 from flipdare.manager.backend_manager import BackendManager
67__all__ = ["FlagService"]
70class _FlagProcessCode:
71 #
72 # 1. STATUS: 'closed'
73 #
74 _1_a = "{label}: 1.a - CLOSED_FINALIZED - Validate restriction was applied (if not, apply it)"
75 _1_b = "{label} : 1.b - CLOSED_NOTIFY - If restriction applied, notify user."
76 _1_c = "{label}: 1.c - CLOSED_COMMENT - Add flag_comment system entry (code: '1.a', '1.b')"
77 #
78 # 2. STATUS: 'withdrawn'
79 #
80 _2_a = "{label}: 2.a - WITHDRAWN-FINALIZED - Check if restriction exists, if so remove it"
81 _2_b = "{label}: 2.b - WITHDRAWN-NOTIFY - If restriction applied, notify user of withdrawal"
82 _2_c = "{label}: 2.c - WITHDRAWN-COMMENT - Add flag_comment system entry (code: '2.a', '2.b')"
83 #
84 # 3. STATUS: 'disputed'
85 #
86 # 3.a - DISPUTED_PROGRESS: 'withdrawn'
87 _3_a_1 = "{label}: 3.a.1 - DISP_WITHDRAWN_FINALIZED - Remove restriction if applied"
88 _3_a_2 = "{label}: 3.a.2 - DISP_WITHDRAWN_NOTIFY - Notify user of withdrawal"
89 _3_a_3 = "{label}: 3.a.3 - DISP_WITHDRAWN_COMMENT - Add flag_comment system entry (code: '3.a.1', '3.a.2')"
90 # 3.b - DISPUTED_PROGRESS: 'approved'
91 _3_b_1 = "{label}: 3.b.1 - DISP_APPROVED_FINALIZED - Remove restriction if any"
92 _3_b_2 = "{label}: 3.b.2 - DISP_APPROVED_NOTIFY - Notify user of approval"
93 _3_b_3 = "{label}: 3.b.3 - DISP_APPROVED_COMMENT - Add flag_comment system entry (code: '3.b.1', '3.b.2')"
94 # 3.c - DISPUTED_PROGRESS: 'rejected'
95 _3_c_1 = "{label}: 3.c.1 - DISP_REJECTED_FINALIZED - Re-apply/verify restriction"
96 _3_c_2 = "{label}: 3.c.2 - DISP_REJECTED_NOTIFY - Notify user of rejection"
97 _3_c_3 = "{label}: 3.c.3 - DISP_REJECTED_COMMENT - Add flag_comment system entry (code: '3.c.1', '3.c.2')"
98 # 3.d - DISPUTED_PROGRESS: 'waitingForAdmin'
99 # 3.d.1 - If most recent comment < 3 days old:
100 _3_d_1_a = "{label}: 3.d.1.a - DISP_WAITING_ADMIN_DAILY - Include in DAILY admin report (no user action)"
101 _3_d_1_b = "{label}: 3.d.1.b - DISP_WAITING_ADMIN_COMMENT - Add flag_comment system entry (code: '3.d.1.a')"
102 # 3.d.2 - If most recent comment > 7 days old:
103 _3_d_2_a = (
104 "{label}: 3.d.2.a - DISP_WAITING_ADMIN_URGENT_DAILY - Include in URGENT admin report"
105 )
106 _3_d_2_b = "{label}: 3.d.2.b - DISP_WAITING_ADMIN_URGENT_REMOVE - Temporarily remove restriction until admin review"
107 _3_d_2_c = "{label}: 3.d.2.c - DISP_WAITING_ADMIN_URGENT_NOTIFY - Notify user of temporary restriction removal"
108 _3_d_2_d = "{label}: 3.d.2.d - DISP_WAITING_ADMIN_URGENT_COMMENT - Add flag_comment system entry (code: '3.d.2.a', '3.d.2.b')"
109 # 3.e - DISPUTED_PROGRESS: 'waitingForUser'
110 # 3.e.1 - If most recent comment from USER and > 3 days old:
111 _3_e_1_a = "{label}: 3.e.1.a - DISP_WAITING_USER_NOTIFY - Sent notify reminder to user to respond to dispute"
112 _3_e_1_b = "{label}: 3.e.1.b - DISP_WAITING_USER_COMMENT - Add flag_comment system entry (code: '3.e.1.a')"
113 # 3.e.2 - If most recent comment from USER and > 7 days old:
114 _3_e_2_a = "{label}: 3.e.2.a - DISP_WAITING_USER_EXPIRE_RESTRICTION - Re-apply the restriction"
115 _3_e_2_c = (
116 "{label}: 3.e.2.c - DISP_WAITING_USER_EXPIRE_NOTIFY - Notify user restriction re-applied."
117 )
118 _3_e_2_d = "{label}: 3.e.2.d - DISP_WAITING_USER_EXPIRE_COMMENT - Add flag_comment system entry (code: '3.e.2.a', '3.e.2.b')"
119 #
120 # 4. STATUS: 'adminAck'
121 #
122 # 4.a - If FlagType is 'minor' or 'moderate':
123 _4_a_1 = "{label}: 4.a.1 - ACK_MINOR_RESTRICTION - Apply warning restriction"
124 _4_a_2 = "{label}: 4.a.2 - ACK_MINOR_REPUTATION - Decrease user reputation"
125 _4_a_3 = "{label}: 4.a.3 - ACK_MINOR_NOTIFY - Notify user of actions taken"
126 _4_a_4 = "{label}: 4.a.4 - ACK_MINOR_COMMENT - Add flag_comment system entry (code: '4.a.1', '4.a.2', '4.a.3')"
127 # 4.b - If FlagType is 'major' or higher:
128 _4_b_1 = "{label}: 4.b.1 - ACK_MAJOR_RESTRICTION - Use RestrictionCalculate to determine appropriate restriction"
129 _4_b_2 = "{label}: 4.b.2 - ACK_MAJOR_REPUTATION - Decrease user reputation"
130 _4_b_3 = "{label}: 4.b.3 - ACK_MAJOR_NOTIFY - Notify user of actions taken"
131 _4_b_4 = "{label}: 4.b.4 - ACK_MAJOR_COMMENT - Add flag_comment system entry (code: '4.b.1', '4.b.2')"
132 #
133 # 5. STATUS: 'open'
134 #
135 # 5.a - If FlagType is 'minor' or 'moderate':
136 # (Note: Don't auto-adminAck for legal/audit reasons)
137 _5_a_1_ = "{label}: 5.a.1 - OPEN_MINOR_RESTRICTION - Apply warning restriction and decrease reputation"
138 _5_a_2_ = "{label}: 5.a.2 - OPEN_MINOR_NOTIFY - Notify user of flag and actions"
139 _5_a_3_ = (
140 "{label}: 5.a.3 - OPEN_MINOR_STATUS - Mark flag as 'autoAck' to prevent re-processing"
141 )
142 _5_a_4_ = "{label}: 5.a.4 - OPEN_MINOR_COMMENT - Add flag_comment system entry (code: '5.a.1', '5.a.2', '5.a.3')"
143 # 5.b - If FlagType is 'major' or higher:
144 _5_b_1_ = "{label}: 5.b.1 - OPEN_MAJOR_HOURLY - Include in HOURLY admin report"
145 _5_b_2_ = "{label}: 5.b.2 - OPEN_MAJOR_RESTRICTION - Use RestrictionCalculate to determine restriction"
146 _5_b_3_ = "{label}: 5.b.3 - OPEN_MAJOR_NOTIFY - Notify user of flag and actions"
147 _5_b_4_ = (
148 "{label}: 5.b.4 - OPEN_MAJOR_STATUS - Mark flag as 'adminAck' to prevent re-processing"
149 )
150 _5_b_5_ = "{label}: 5.b.5 - OPEN_MAJOR_COMMENT - Add flag_comment system entry (code: '5.b.1', '5.b.2')"
153_K = FlagKeys
154_JT = AppJobType
155_COL = FirestoreCollections.FLAG
156_CODE = _FlagProcessCode
159class FlagService(EmailMixin, ErrorMixin, ServiceProvider):
160 """
161 Flag Processing Algorithm - based on 'status' and 'disputedProgress'.
163 Important:
164 - System messages track flow to prevent duplicate notifications
165 - All restriction changes must be logged with reason
166 - FlagContext validation required before any action
168 Reports:
169 DAILY - 3.d.1.a - DISP_WAITING_ADMIN_DAILY
170 URGENT - 3.d.2.a - DISP_WAITING_ADMIN_URGENT_DAILY
171 HOURLY - 5.b.1 - OPEN_MAJOR_HOURLY
173 Edge Cases:
174 - Verify flagged content still exists before processing
175 - Handle concurrent flag processing with optimistic locking
176 - Check if restriction was manually modified outside flag system
177 - Validate users still exist (handle deleted accounts)
178 - Skip processing if flag already marked as processed in internal_state
180 """
182 def __init__(
183 self,
184 db_manager: DbManager | None = None,
185 backend_manager: BackendManager | None = None,
186 service_manager: ServiceManager | None = None,
187 search_manager: SearchManager | None = None,
188 ) -> None:
189 super().__init__(
190 backend_manager=backend_manager,
191 db_manager=db_manager,
192 service_manager=service_manager,
193 search_manager=search_manager,
194 )
196 @cron_decorator(job_type=_JT.CR_FLAG_UNPROCESSED)
197 def cron_flag_unprocessed(self) -> CronResult:
198 started_at = TimeUtil.get_current_utc_dt()
200 flags: list[FlagWrapper] | None = None
201 try:
202 flags = self.flag_db.get_recent_major_unprocessed()
203 except Exception as e:
204 msg = f"Error retrieving unprocessed flags: {e}"
205 LOG().error(msg)
206 self.app_logger.db_error(
207 error_code=AppErrorCode.DATABASE_EX,
208 job_type=_JT.CR_FLAG_UNPROCESSED,
209 collection=_COL,
210 ex_error=e,
211 notify_admin=True,
212 )
214 ended_at = TimeUtil.get_current_utc_dt()
215 duration = TimeUtil.duration_in_seconds(started_at, ended_at)
216 return CountMetric.error(duration=duration)
218 success_ct = 0
219 failed_ct = 0
221 main_result = AppResult[FlagWrapper]()
223 for flag in flags:
224 try:
225 flag_result = AppResult[FlagWrapper](doc_id=flag.doc_id)
226 self._process_flag(flag, flag_result, is_new=False)
227 if not flag_result.is_error:
228 success_ct += 1
229 continue
231 # handle the error
232 failed_ct += 1
233 msg = f"Error processing flag {flag.doc_id}: {flag_result.formatted}"
234 LOG().error(msg)
235 main_result.merge(flag_result)
237 except Exception as e:
238 msg = f"Error during flag cron processing: {e}"
239 LOG().error(msg)
240 main_result.add_error(AppErrorCode.UNEXPECTED_CODE_PATH, msg)
242 ended_at = TimeUtil.get_current_utc_dt()
243 duration = TimeUtil.duration_in_seconds(started_at, ended_at)
245 if not main_result.is_error:
246 msg = f"Flag cron processed completed in {duration} secs. Success={success_ct}, Failed={failed_ct} failed."
247 LOG().info(msg)
248 return CountMetric(
249 success_ct=success_ct,
250 failed_ct=failed_ct,
251 skipped_ct=0,
252 duration=duration,
253 )
255 # handle the error
256 msg = f"Errors occurred during flag cron processing ({duration} secs). Please investigate."
257 self.cron_result_error(
258 job_type=_JT.CR_FLAG_UNPROCESSED,
259 result=main_result,
260 collection=_COL,
261 notify_admin=True,
262 error_code=AppErrorCode.FLAG,
263 message=msg,
264 )
266 return CountMetric.error(duration=duration)
268 # ========================================================================
269 # TRIGGERS - Delegate to processors
270 # ========================================================================
272 @job_type_decorator(_JT.TR_FLAG)
273 @trigger_decorator(job_type=_JT.TR_FLAG, collection=_COL, wrapper_class=FlagWrapper)
274 def trigger_flag(
275 self,
276 job: AppJobWrapper,
277 *,
278 wrapper: FlagWrapper,
279 ) -> JobResult[FlagWrapper]:
280 flag_id = wrapper.doc_id
281 main_result = AppResult[FlagWrapper](
282 task_name=f"Flag Trigger for {flag_id}", doc_id=job.doc_id
283 )
285 # Use unified processor for both new and updated flags
286 is_new = not job.has_changes
287 changes = job.get_updates() if job.has_changes else None
288 return self._process_flag(wrapper, main_result, is_new=is_new, changes=changes)
290 # ========================================================================
291 # Processing
292 # ========================================================================
294 def _process_flag(
295 self,
296 flag_model: FlagWrapper,
297 main_result: AppResult[FlagWrapper],
298 is_new: bool = True,
299 changes: DatabaseDict | None = None,
300 ) -> JobResult[FlagWrapper]:
301 """
302 Unified flag processing using StepProcessor.
303 Handles both new flags and updates consistently.
305 Args:
306 flag_model: The flag to process
307 is_new: True if this is a new flag, False if it's an update
308 changes: Dictionary of changes (for updates only)
310 """
311 flag_id = flag_model.doc_id
313 # Apply skip conditions based on flag type
314 if is_new:
315 # Skip minor flags - they don't require immediate processing
316 if flag_model.flag_type.is_minor:
317 msg = f"Flag {flag_id} is minor, skipping immediate processing."
318 LOG().debug(msg)
319 return JobResult.skip_doc(doc_id=flag_id, message=msg)
321 # Skip if already acknowledged
322 if flag_model.progress.is_admin_ack:
323 msg = f"Flag {flag_id} already acknowledged. Skipping."
324 LOG().debug(msg)
325 return JobResult.skip_doc(doc_id=flag_id, message=msg)
326 else:
327 # For updates, check if there are relevant changes
328 if changes is None:
329 msg = "No changes in flag update. Skipping processing."
330 LOG().debug(msg)
331 return JobResult.skip_doc(message=msg, doc_id=flag_id)
333 disputed_progress = FlagKeys.DISPUTED_PROGRESS
334 progress = FlagKeys.PROGRESS
336 if progress not in changes and disputed_progress not in changes:
337 msg = f"No relevant changes in flag update. Changes: {stringify_debug(changes)}"
338 LOG().debug(msg)
339 return JobResult.skip_doc(message=msg, doc_id=flag_id)
341 try:
342 action = "new" if is_new else "update"
343 disputed_str = (
344 flag_model.disputed_progress.value if flag_model.disputed_progress else "None"
345 )
346 LOG().info(
347 f"Processing {action} flag {flag_id}: progress={flag_model.progress.value}, "
348 f"disputed={disputed_str}, type={flag_model.flag_type.value}",
349 )
351 # Use StepProcessor for consistent processing
352 processor = self._build_step_processor(flag_model)
353 process_result = processor.execute()
355 if process_result.is_error:
356 return JobResult.from_result(result=process_result, doc_id=flag_id)
358 return JobResult.ok(
359 message=f"Flag {action} processed successfully.",
360 doc_id=flag_id,
361 )
363 except Exception as e:
364 msg = f"Error processing flag {flag_id}: {e}\n\t{flag_model!s}"
365 main_result.add_error(AppErrorCode.UNEXPECTED_CODE_PATH, msg)
366 return JobResult.from_result(result=main_result, doc_id=flag_id)
368 def _build_step_processor(self, flag: FlagWrapper) -> StepProcessor[FlagWrapper]:
369 """
370 Build a StepProcessor for flag processing.
371 All steps are chained with proper context passing.
372 """
373 flag_id = flag.doc_id
375 return StepProcessor(
376 wrapper=flag,
377 steps=[
378 ProcessingStep[_K, FlagWrapper](
379 state_key=_K.FLAG_EVALUATED,
380 handler=lambda m: self._evaluate_flag(m),
381 description="Evaluate Flag",
382 required=True,
383 ),
384 ProcessingStep[_K, FlagWrapper](
385 state_key=_K.FLAG_RESTRICTION_APPLIED,
386 handler=lambda m: self._apply_restriction_step(m),
387 description="Apply Restriction",
388 required=True,
389 ),
390 ProcessingStep[_K, FlagWrapper](
391 state_key=_K.FLAG_USER_NOTIFIED,
392 handler=lambda m: self._user_notified_step(m),
393 description="Notify User",
394 required=True,
395 ),
396 ProcessingStep[_K, FlagWrapper](
397 state_key=_K.FLAG_REMOVED_FROM_SEARCH,
398 handler=lambda m: self._remove_from_search_step(m),
399 description="Remove from search index",
400 required=False, # Optional step
401 ),
402 ],
403 save_handler=lambda m: self.flag_bridge.update(m),
404 process_name=f"process_flag_{flag_id}",
405 )
407 def _apply_restriction_step(self, flag: FlagWrapper) -> AppResult[FlagWrapper]:
408 """Wrapper for _apply_restriction that returns flag model instead of context."""
409 context_result = self._evaluate_flag(flag)
410 if context_result.is_error:
411 result = AppResult[FlagWrapper](task_name="apply_restriction_wrapper")
412 result.merge(context_result)
413 return result
415 context = context_result.generated
416 assert context # narrowing
418 restriction_result = self._apply_restriction(context)
419 result = AppResult[FlagWrapper](task_name="apply_restriction_wrapper")
420 if restriction_result.is_error:
421 result.merge(restriction_result)
422 else:
423 result.generated = flag
424 return result
426 def _user_notified_step(self, flag: FlagWrapper) -> AppResult[FlagWrapper]:
427 """Wrapper for _user_notified that returns flag model instead of context."""
428 context_result = self._evaluate_flag(flag)
429 if context_result.is_error:
430 result = AppResult[FlagWrapper](task_name="user_notified_wrapper")
431 result.merge(context_result)
432 return result
434 context = context_result.generated
435 assert context # narrowing
437 notify_result = self._user_notified(context)
438 result = AppResult[FlagWrapper](task_name="user_notified_wrapper")
439 if notify_result.is_error:
440 result.merge(notify_result)
441 else:
442 result.generated = flag
443 return result
445 def _remove_from_search_step(self, flag: FlagWrapper) -> AppResult[FlagWrapper]:
446 """Wrapper for _remove_from_search_index that returns flag model instead of context."""
447 context_result = self._evaluate_flag(flag)
448 if context_result.is_error:
449 result = AppResult[FlagWrapper](task_name="remove_search_wrapper")
450 result.merge(context_result)
451 return result
453 context = context_result.generated
454 assert context # narrowing
456 remove_result = self._remove_from_search_index(context)
457 result = AppResult[FlagWrapper](task_name="remove_search_wrapper")
458 if remove_result.is_error:
459 result.merge(remove_result)
460 else:
461 result.generated = flag
462 return result
464 def _evaluate_flag(self, flag: FlagWrapper) -> AppResult[FlagContext]:
465 """
466 Step 1: Evaluate flag status and determine required actions.
467 Creates FlagContext with all necessary user information.
468 """
469 flag_id = flag.doc_id
470 assert flag_id # narrowing
472 result = AppResult[FlagContext](task_name=f"evaluate_flag_{flag_id}")
474 # Build context with user information
475 from_user_result = self.user_bridge.get(flag.from_uid)
476 if from_user_result.is_error:
477 msg = f"Reported user {flag.from_uid} not found for flag {flag_id}"
478 LOG().error(msg)
479 result.add_error(AppErrorCode.NOT_FOUND, msg)
481 return result
483 to_user_result = self.user_bridge.get(flag.to_uid)
484 if to_user_result.is_error:
485 msg = f"Flagged user {flag.to_uid} not found for flag {flag_id}"
486 LOG().error(msg)
487 result.add_error(AppErrorCode.NOT_FOUND, msg)
488 return result
490 from_user = from_user_result.generated
491 to_user = to_user_result.generated
492 # assert from_user and to_user # narrowing
494 # do some sanity checking
495 disputed_progress = flag.disputed_progress
496 progress = flag.progress
498 if progress.is_disputed and not disputed_progress:
499 # this is a problem.
500 msg = f"Flag {flag_id} is in disputed progress but has no disputedProgress. Resetting to WAITING_FOR_ADMIN."
501 LOG().warning(msg)
502 updated_result = self._set_flag_disputed_progress(
503 flag,
504 DisputedProgress.WAITING_ADMIN,
505 )
506 if updated_result.is_error:
507 result.merge(updated_result)
508 return result
510 # if not is_error , has generated result..
511 assert updated_result.generated is not None # hinting
512 flag = updated_result.generated
514 # Create context
515 context = FlagContext(flag_model=flag, from_user=from_user, to_user=to_user)
517 if not context.validate():
518 msg = f"Invalid FlagContext: {context.validation_error}"
519 LOG().error(msg)
520 result.add_error(AppErrorCode.INVALID_DATA, msg)
521 return result
523 # Determine processing path based on progress and disputedProgress
524 flag_progress = flag.progress
525 disputed_status = flag.disputed_progress
526 flag_type = flag.flag_type
528 if IS_DEBUG:
529 LOG().debug(
530 f"Evaluating flag {flag_id}: progress={flag_progress.value}, "
531 f"disputed={disputed_status.value if disputed_status else 'None'}, "
532 f"type={flag_type.value}",
533 )
535 result.generated = context
536 return result
538 # fmt: off
539 def _apply_restriction( # noqa: PLR0912, PLR0915
540 self,
541 context: FlagContext,
542 ) -> AppResult[FlagContext]:
543 """
544 Step 2: Apply, remove, or verify restrictions based on flag status.
545 Handles all restriction logic from the algorithm.
546 """
547 if not context.validate():
548 result = AppResult[FlagContext](task_name="apply_restriction")
549 result.add_error(
550 AppErrorCode.INVALID_DATA,
551 f"Invalid context: {context.validation_error}",
552 )
553 return result
555 flag = context.flag
556 flag_id = flag.doc_id
557 assert flag_id # narrowing
559 result = AppResult[FlagContext](task_name=f"apply_restriction_{flag_id}")
560 flagged_user = context.to_user
561 restriction_service = self.restriction_service
563 progress = flag.progress
564 flag_type = flag.flag_type
566 match progress:
567 # 1. PROGRESS: 'resolved' - Ensure restriction is applied
568 case IssueProgress.RESOLVED:
569 restrict_result = restriction_service.apply_temporary_restriction(flag)
570 msg = _CODE._1_a.format(label=f"Flag {flag_id}")
571 if not restrict_result.is_error:
572 # Add system comment (code: '1.a')
573 LOG().info(msg)
574 self._add_system_comment(flag, FlagCode.CLOSED_FINALIZED)
575 else:
576 msg += f'\n\tError applying restriction for resolved flag {flag_id}: {restrict_result.formatted}'
577 LOG().error(msg)
578 result.merge(restrict_result)
579 return result
582 # 2. PROGRESS: 'withdrawn' - Remove restriction
583 case IssueProgress.WITHDRAWN:
584 msg = _FlagProcessCode._2_a.format(label=f"Flag {flag_id}")
585 if flag.restriction_id:
586 remove_result = restriction_service.remove_restriction(flagged_user, flag_id)
587 if not remove_result.is_error:
588 # Add system comment (code: '2.a')
589 LOG().info(msg)
590 self._add_system_comment(flag, FlagCode.WITHDRAWN_FINALIZED)
591 else:
592 msg += f'\n\tError removing restriction for withdrawn flag {flag_id}: {remove_result.formatted}'
593 LOG().error(msg)
594 result.merge(remove_result)
595 return result
597 # 3. PROGRESS: 'disputed'
598 case IssueProgress.DISPUTED:
599 # 3.a - DISPUTED_PROGRESS: 'withdrawn'
600 # 3.b - DISPUTED_PROGRESS: 'approved'
601 # 3.c - DISPUTED_PROGRESS: 'rejected'
602 disputed_result = self._handle_disputed(flag, flagged_user)
603 if disputed_result.is_error:
604 msg = _CODE._3_a_1.format(label=f"Flag {flag_id}")
605 msg += f'\n\tError handling disputed flag {flag_id}: {disputed_result.formatted}'
606 LOG().error(msg)
607 result.merge(disputed_result)
608 return result
609 # 4. PROGRESS: 'adminAck'
610 case IssueProgress.ADMIN_ACK:
611 if flag_type.is_minor or flag_type.is_moderate:
612 # 4.1 - Apply temporary restriction (warning-level)
613 msg = _CODE._4_a_1.format(label=f"Flag {flag_id}")
614 restrict_result = restriction_service.apply_temporary_restriction(flag)
615 if restrict_result.is_error:
616 msg += f'\n\tError applying restriction for adminAck flag {flag_id}: {restrict_result.formatted}'
617 LOG().error(msg)
618 result.merge(restrict_result)
619 return result
621 # Decrease reputation (code: '4.1.1', '4.1.2')
622 LOG().info(msg)
623 self._decrease_reputation(flagged_user, flag_type)
624 self._add_system_comment(flag, FlagCode.ACK_MINOR_RESTRICTION)
625 elif flag_type.is_major:
626 # 4.2 - Apply restriction (calculation happens internally)
627 msg = _CODE._4_b_1.format(label=f"Flag {flag_id}")
628 restrict_result = restriction_service.apply_temporary_restriction(flag)
629 if not restrict_result.is_error:
630 LOG().info(msg)
631 self._add_system_comment(flag, FlagCode.ACK_MAJOR_RESTRICTION)
632 else:
633 msg += f'\n\tError applying restriction for adminAck major flag {flag_id}: {restrict_result.formatted}'
634 LOG().error(msg)
635 result.merge(restrict_result)
636 return result
639 # 5. STATUS: 'open'
640 case IssueProgress.OPEN:
641 if flag_type.is_minor or flag_type.is_moderate:
642 # 5.a - Apply temporary restriction and decrease reputation
643 msg = _CODE._5_a_1_.format(label=f"Flag {flag_id}")
644 restrict_result = restriction_service.apply_temporary_restriction(flag)
645 if not restrict_result.is_error:
646 LOG().info(msg)
647 self._decrease_reputation(flagged_user, flag_type)
648 self._add_system_comment(flag, FlagCode.OPEN_MINOR_NOTIFY)
649 if restrict_result.is_error:
650 msg += f'\n\tError applying restriction for open flag {flag_id}: {restrict_result.formatted}'
651 LOG().error(msg)
652 result.merge(restrict_result)
653 return result
655 elif flag_type.is_major:
656 # 5.b - Apply restriction (calculation happens internally)
657 msg = _CODE._5_b_2_.format(label=f"Flag {flag_id}")
658 restrict_result = restriction_service.apply_temporary_restriction(flag)
659 if not restrict_result.is_error:
660 LOG().info(msg)
661 self._add_system_comment(flag, FlagCode.OPEN_MAJOR_NOTIFY)
662 else:
663 msg += f'\n\tError applying restriction for open major flag {flag_id}: {restrict_result.formatted}'
664 LOG().error(msg)
665 result.merge(restrict_result)
666 return result
668 case IssueProgress.WAITING_ADMIN:
669 # this means the restriction has already been applied/removed and we are waiting for
670 # admin review. No action to take on the restriction at this time, but we may need
671 # to add a system comment or send a notification depending on timing
672 # (handled in _handle_disputed).
673 if IS_DEBUG:
674 LOG().debug(f"Flag {flag_id} is waiting for admin review. No restriction action taken at this time.")
676 result.generated = context
677 return result
678 # fmt: on
680 # fmt: off
681 def _handle_disputed( # noqa: PLR0912, PLR0915
682 self,
683 flag: FlagWrapper,
684 flagged_user: UserWrapper,
685 ) -> AppResult[FlagWrapper]:
686 restriction_service = self.restriction_service
688 flag_id = flag.doc_id
689 result = AppResult[FlagWrapper](task_name=f"handle_disputed_{flag_id}")
691 disputed_progress = flag.disputed_progress
692 # 3.a - DISPUTED_PROGRESS: 'withdrawn'
693 msg = _CODE._3_a_1.format(label=f"Flag {flag_id}")
694 if not disputed_progress:
695 # this should be impossible since we checked this in _evaluate_flag, but just in case:
696 msg += f"Flag {flag_id} is in disputed progress but has no disputedProgress."
697 LOG().error(msg)
698 result.add_error(AppErrorCode.INVALID_DATA,msg)
699 return result
701 match disputed_progress:
702 case DisputedProgress.WITHDRAWN:
703 if flag.restriction_id:
704 remove_result = restriction_service.remove_restriction(flagged_user, flag_id)
705 if remove_result.is_error:
706 msg += f'\n\tError removing restriction for withdrawn disputed flag {flag_id}: {remove_result.formatted}'
707 LOG().error(msg)
708 result.merge(remove_result)
709 return result
710 self._add_system_comment(flag, FlagCode.DISP_WITHDRAWN_FINALIZED)
712 # 3.b - DISPUTED_PROGRESS: 'accepted'
713 case DisputedProgress.ACCEPTED:
714 if flag.restriction_id:
715 remove_result = restriction_service.remove_restriction(flagged_user, flag_id)
716 if not remove_result.is_error:
717 # Add system comment (code: '3.b.1')
718 LOG().info(msg)
719 self._add_system_comment(flag, FlagCode.DISP_APPROVED_FINALIZED)
720 else:
721 msg = f'\n\tError removing restriction for accepted disputed flag {flag_id}: {remove_result.formatted}'
722 LOG().error(msg)
723 result.merge(remove_result)
724 return result
726 # 3.c - DISPUTED_PROGRESS: 'rejected'
727 case DisputedProgress.REJECTED:
728 restrict_result = restriction_service.apply_temporary_restriction(flag)
729 if not restrict_result.is_error:
730 # Add system comment (code: '3.c.1')
731 LOG().info(msg)
732 self._add_system_comment(flag, FlagCode.DISP_REJECTED_FINALIZED)
733 else:
734 msg = f'\n\tError applying restriction for rejected disputed flag {flag_id}: {restrict_result.formatted}'
735 LOG().error(msg)
736 result.merge(restrict_result)
737 return result
739 # 3.d - DISPUTED_PROGRESS: 'waitingForAdmin'
740 case DisputedProgress.WAITING_ADMIN:
741 # Check timing for temporary restriction removal
742 if not self._should_remove_temp_restrict(flag_id):
743 # 3.d.1 - Include in daily admin report (no user action)
744 self._add_system_comment(flag, FlagCode.DISP_WAITING_ADMIN_DAILY)
745 else:
746 # 3.d.2 - Remove restriction temporarily and notify user
747 if flag.restriction_id:
748 remove_result = restriction_service.remove_restriction(flagged_user, flag_id)
749 if remove_result.is_error:
750 msg = f'\n\tError removing restriction for waiting admin flag {flag_id}: {remove_result.formatted}'
751 LOG().error(msg)
752 result.merge(remove_result)
753 return result
754 # Notify user of temporary removal
755 # this will get picked up in the daily report for admin.
756 try:
757 removal_email = FlagRemovedEmail(flag=flag, flagged_user=flagged_user)
758 email_result = self.send_user_email(
759 user=flagged_user,
760 template=removal_email,
761 doc_id=flag_id,
762 collection=FirestoreCollections.FLAG,
763 )
764 if not email_result.is_error:
765 # Add system comment (code: '3.d.2.a', '3.d.2.b')
766 LOG().info(f"Flag {flag_id} has been waiting for admin review for over 7 days. Temporary restriction removed and user notified.")
767 self._add_system_comment(flag, FlagCode.DISP_WAITING_ADMIN_URGENT_DAILY)
768 else:
769 cause = f"\n\tFailed to send urgent admin review email for flag {flag_id}"
770 LOG().error(cause)
771 result.add_error(AppErrorCode.INVALID_EMAIL, cause)
772 return result
773 except Exception as e:
774 msg = f"\n\tException sending urgent admin review email for flag {flag_id}: {e}"
775 LOG().error(msg)
776 result.add_error(AppErrorCode.INVALID_EMAIL, msg)
777 return result
780 case DisputedProgress.WAITING_USER:
781 # Check if most recent comment from USER is > 3 days old
782 try:
783 last_comment = self.flag_db.get_most_recent_comment_for_flag(flag_id)
784 if last_comment and last_comment.from_uid == flagged_user.doc_id:
785 now = TimeUtil.get_current_utc_dt()
786 comment_utc_time = FirestoreTime.from_firestore(last_comment.created_at_db)
787 if comment_utc_time:
788 days_diff = (now - comment_utc_time).days
789 if days_diff > RESTRICTION_REMINDER_USER_RESPONSE_FLAG:
790 # 3.e.1.a - Send reminder to user to respond to dispute
791 msg = _CODE._3_e_1_a.format(label=f"Flag {flag_id}")
792 flag_email = FlagEmail(
793 flagged_user=flagged_user,
794 flag=flag,
795 restriction=None,
796 )
797 email_result = self.send_user_email(
798 user=flagged_user,
799 template=flag_email,
800 doc_id=flag_id,
801 collection=FirestoreCollections.FLAG,
802 )
803 if not email_result.is_error:
804 # Add system comment (code: '3.e.1.a')
805 self._add_system_comment(flag, FlagCode.DISP_WAITING_USER_NOTIFY)
806 LOG().info(msg)
807 else:
808 msg += f"\n\tFailed to send waiting user reminder email for flag {flag_id}"
809 LOG().error(msg)
810 result.add_error(AppErrorCode.INVALID_EMAIL, msg)
811 return result
812 if days_diff > RESTRICTION_MAX_WAIT_USER_RESPONSE_FLAG:
813 # 3.e.2.a - Re-apply restriction
814 msg = _CODE._3_e_2_a.format(label=f"Flag {flag_id}")
815 restrict_result = restriction_service.apply_temporary_restriction(flag)
816 if restrict_result.is_error:
817 msg += f'\n\tError re-applying restriction for waiting user flag {flag_id}: {restrict_result.formatted}'
818 LOG().error(msg)
819 result.merge(restrict_result)
820 return result
821 # Notify user of restriction re-application
822 flag_email = FlagEmail(
823 flagged_user=flagged_user,
824 flag=flag,
825 restriction=None,
826 )
827 email_result = self.send_user_email(
828 user=flagged_user,
829 template=flag_email,
830 doc_id=flag_id,
831 collection=FirestoreCollections.FLAG,
832 )
833 if not email_result.is_error:
834 # Add system comment (code: '3.e.2.a', '3.e.2.b')
835 self._add_system_comment(flag, FlagCode.DISP_WAITING_USER_EXPIRE_RESTRICTION)
836 msg += f"\n\tFlag {flag_id} has been waiting for user response for over 14 days. Restriction re-applied and user notified."
837 LOG().info(msg)
838 else:
839 msg += f"\n\tFailed to send waiting user expiration email for flag {flag_id}"
840 LOG().error(msg)
841 result.add_error(AppErrorCode.INVALID_EMAIL, msg)
842 return result
843 except Exception as e:
844 msg = f"Failed to handle waiting user logic for flag {flag_id}: {e}"
845 LOG().error(msg)
846 result.add_error(AppErrorCode.SERVER_EX, msg)
847 return result
849 return result
850 # fmt: on
852 def _should_remove_temp_restrict(self, flag_id: str) -> bool:
853 try:
854 last_comment = self.flag_db.get_most_recent_comment_for_flag(flag_id)
855 if last_comment is None:
856 return False
858 now = TimeUtil.get_current_utc_dt()
859 comment_utc_time = FirestoreTime.from_firestore(last_comment.created_at_db)
860 if comment_utc_time is None:
861 self.app_logger.system_error(
862 error_code=AppErrorCode.INVALID_DATA,
863 message=f"Failed to get createdAt for flag {flag_id}",
864 )
865 return False
867 days_diff = (now - comment_utc_time).days
868 return days_diff > RESTRICTION_MAX_WAIT_ADMIN_ACK_FLAG
870 except Exception as e:
871 self.app_logger.system_error(
872 message=f"Failed to get most recent comment for flag {flag_id}; {e}",
873 error_code=AppErrorCode.SERVER_EX,
874 notify_admin=False,
875 )
876 return False
878 def _user_notified( # noqa: PLR0912, PLR0915
879 self,
880 context: FlagContext,
881 ) -> AppResult[FlagContext]:
882 """
883 Step 4: Notify users about flag actions and outcomes.
884 Different notifications based on status and disputedProgress.
885 """
886 if not context.validate():
887 result = AppResult[FlagContext](task_name="user_notified")
888 result.add_error(
889 AppErrorCode.INVALID_DATA,
890 f"Invalid context: {context.validation_error}",
891 )
892 return result
894 flag = context.flag
895 flag_id = flag.doc_id
896 assert flag_id # narrowing
898 result = AppResult[FlagContext](task_name=f"user_notified_{flag_id}")
899 flagged_user = context.to_user
900 progress = flag.progress
901 disputed_progress = flag.disputed_progress
903 # Determine which user to notify and which email template to use
904 should_notify = True
905 notify_target = flagged_user
906 # if restriction id is none, the flag was removed..
907 # either stopwatch, or admin removed it (disputed etc).
908 was_removed = flagged_user.restriction_id is None
910 # Skip notification for certain cases
911 if progress == IssueProgress.OPEN and (
912 flag.flag_type.is_minor or flag.flag_type.is_moderate
913 ):
914 # Don't notify for minor/moderate open flags (admin will review first)
915 should_notify = False
917 if not should_notify:
918 return result # early return if no notification needed
920 # Get restriction if it exists
921 restriction = None
922 if flag.restriction_id:
923 restriction_result = self.restriction_bridge.get(
924 flag.restriction_id,
925 )
926 if not restriction_result.is_error:
927 restriction = restriction_result.generated
929 # Send flag email
930 email_template: JinjaEmailTemplate[Any]
931 if was_removed:
932 email_template = FlagRemovedEmail(
933 flag=flag,
934 flagged_user=flagged_user,
935 )
936 else:
937 email_template = FlagEmail(
938 flagged_user=notify_target,
939 flag=flag,
940 restriction=restriction,
941 )
943 email_result = self.send_user_email(
944 user=notify_target,
945 template=email_template,
946 doc_id=flag_id,
947 collection=FirestoreCollections.FLAG,
948 )
950 if email_result.is_error:
951 result.add_error(
952 AppErrorCode.INVALID_EMAIL,
953 f"Failed to send user notification for flag {flag_id}",
954 )
955 return result
957 # Add system comment for notification
958 match progress:
959 case IssueProgress.RESOLVED:
960 self._add_system_comment(flag, FlagCode.CLOSED_NOTIFY)
961 case IssueProgress.WITHDRAWN:
962 self._add_system_comment(flag, FlagCode.WITHDRAWN_NOTIFY)
963 case IssueProgress.DISPUTED:
964 if disputed_progress == DisputedProgress.WITHDRAWN:
965 self._add_system_comment(flag, FlagCode.DISP_WITHDRAWN_NOTIFY)
966 elif disputed_progress == DisputedProgress.ACCEPTED:
967 self._add_system_comment(flag, FlagCode.DISP_APPROVED_NOTIFY)
968 elif disputed_progress == DisputedProgress.REJECTED:
969 self._add_system_comment(flag, FlagCode.DISP_REJECTED_NOTIFY)
970 case IssueProgress.WAITING_ADMIN:
971 if flag.flag_type.is_minor or flag.flag_type.is_moderate:
972 self._add_system_comment(flag, FlagCode.ACK_MINOR_RESTRICTION)
973 else:
974 self._add_system_comment(flag, FlagCode.ACK_MAJOR_RESTRICTION)
975 case IssueProgress.OPEN:
976 if flag.flag_type.is_minor or flag.flag_type.is_moderate:
977 self._add_system_comment(flag, FlagCode.OPEN_MINOR_NOTIFY)
978 else:
979 self._add_system_comment(flag, FlagCode.OPEN_MAJOR_NOTIFY)
980 case IssueProgress.ADMIN_ACK:
981 # internal admin status, no need to add system comment for user
982 # notification since admin is already aware of the flag at this point.
983 pass
985 result.generated = context
986 return result
988 def _remove_from_search_index(self, context: FlagContext) -> AppResult[FlagContext]:
989 """
990 Step 5: Remove flagged content from search index if necessary.
991 Only removes for major flags or when restriction is permanent.
992 """
993 if not context.validate():
994 result = AppResult[FlagContext](task_name="remove_from_search")
995 result.add_error(
996 AppErrorCode.INVALID_DATA,
997 f"Invalid context: {context.validation_error}",
998 )
999 return result
1001 flag = context.flag
1002 flag_id = flag.doc_id
1003 assert flag_id # narrowing
1005 result = AppResult[FlagContext](task_name=f"remove_from_search_{flag_id}")
1007 # Only remove from search for major flags that are closed
1008 if flag.flag_type.is_major and flag.progress == IssueProgress.RESOLVED:
1009 try:
1010 indexer = self.backend_manager.indexer
1011 search_obj_type = flag.obj_type.search_obj_type
1012 if search_obj_type is None:
1013 msg = (
1014 f"Flag {flag_id}/{flag.obj_type} has no search obj type, skipping removal"
1015 )
1016 LOG().info(msg)
1017 return result
1018 else:
1019 indexer.delete_general(flag.obj_id, search_obj_type)
1020 msg = f"Removed flagged content from search index for flag {flag_id}"
1021 LOG().info(msg)
1022 except Exception as e:
1023 msg = f"Failed to remove from search index for flag {flag_id}: {e}"
1024 result.add_error(AppErrorCode.SERVER_EX, msg)
1025 return result
1027 result.generated = context
1028 return result
1030 def _add_system_comment(self, flag: FlagWrapper, code: FlagCode) -> None:
1031 """Add a system comment to flag for audit trail."""
1032 flag_id = flag.doc_id
1033 if not flag_id:
1034 LOG().warning("Cannot add system comment to flag without doc_id")
1035 return
1037 try:
1038 comment = IssueCommentModel(
1039 id=None,
1040 from_uid=ADMIN_DOC_ID,
1041 to_uid=flag.to_uid,
1042 is_admin=True,
1043 message=code.message,
1044 )
1046 self.flag_db.create_flag_comment(flag_id, comment)
1047 LOG().debug(f"Added system comment to flag {flag_id}: {code} - {code.message}")
1048 except Exception as e:
1049 LOG().error(f"Failed to add system comment to flag {flag_id}: {e}")
1051 def _decrease_reputation(self, user: UserWrapper, flag_type: FlagType) -> None:
1052 """Decrease user reputation based on flag severity."""
1053 user_id = user.doc_id
1054 if not user_id:
1055 LOG().warning("Cannot decrease reputation for user without doc_id")
1056 return
1058 try:
1059 reputation = self.service_manager.reputation
1060 reputation.penalize_user_for_flag(flag_type=flag_type, user=user)
1061 LOG().debug(f"Decreased reputation for user {user_id} due to {flag_type.value} flag")
1062 except Exception as e:
1063 LOG().error(f"Failed to decrease reputation for user {user_id}: {e}")
1065 def _send_flag_email(
1066 self,
1067 context: FlagContext,
1068 restriction: RestrictionWrapper,
1069 ) -> Outcome:
1070 """Notify the user who created the flagged content."""
1071 if not context.valid:
1072 # double check, validation needs to be done before calling this method
1073 LOG().error(f"Invalid FlagContext: {context.validation_error}")
1074 return Outcome.ERROR
1076 flag_model = context.flag
1077 flag_id = flag_model.doc_id
1078 assert flag_id # narrowing
1079 flagged_user = context.to_user
1080 email_content = FlagEmail(
1081 flagged_user=flagged_user,
1082 flag=flag_model,
1083 restriction=restriction,
1084 )
1086 return self.send_user_email(
1087 user=flagged_user,
1088 template=email_content,
1089 doc_id=flag_id,
1090 collection=FirestoreCollections.FLAG,
1091 )
1093 def _send_flag_removed_email(
1094 self,
1095 context: FlagContext,
1096 ) -> Outcome:
1097 """Notify the user who created the flagged content."""
1098 if not context.valid:
1099 # double check, validation needs to be done before calling this method
1100 LOG().error(f"Invalid FlagContext: {context.validation_error}")
1101 return Outcome.ERROR
1103 flag_model = context.flag
1104 flag_id = flag_model.doc_id
1105 assert flag_id # narrowing
1106 flagged_user = context.to_user
1107 email_content = FlagRemovedEmail(
1108 flagged_user=flagged_user,
1109 flag=flag_model,
1110 )
1112 return self.send_user_email(
1113 user=flagged_user,
1114 template=email_content,
1115 doc_id=flag_id,
1116 collection=FirestoreCollections.FLAG,
1117 )
1119 def _set_flag_disputed_progress(
1120 self,
1121 flag: FlagWrapper,
1122 disputed_progress: DisputedProgress,
1123 ) -> AppResult[FlagWrapper]:
1124 result: AppResult[FlagWrapper] = AppResult(doc_id=flag.doc_id)
1125 try:
1126 flag.update_field("disputed_progress", disputed_progress)
1127 update_result = self.flag_bridge.update(flag)
1128 if update_result.is_error:
1129 result.merge(update_result)
1130 elif update_result.generated is None:
1131 result.add_error(
1132 AppErrorCode.DATABASE_EX,
1133 f"Updated flag {flag.doc_id} has no generated model.",
1134 )
1135 else:
1136 result.generated = update_result.generated
1137 except Exception as e:
1138 msg = f"Failed to set flag disputed status to {disputed_progress.value} for {flag.doc_id}: {e}"
1139 result.add_error(AppErrorCode.UPDATE_FAILED, msg)
1141 return result