Coverage for functions \ flipdare \ service \ compliance_service.py: 99%
86 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from collections.abc import Callable
16from typing import TYPE_CHECKING, Any
18from flipdare.app_log import LOG
19from flipdare.constants import IS_DEBUG
20from flipdare.result.app_result import AppResult
21from flipdare.core.job_type_decorator import job_type_decorator
22from flipdare.result.job_result import JobResult
23from flipdare.core.trigger_decorator import trigger_decorator
24from flipdare.core.tokenizer import Tokenizer
25from flipdare.generated import AppErrorCode, AppJobType
26from flipdare.generated.model.backend.compliance_model import ComplianceModel
27from flipdare.generated.shared.firestore_collections import FirestoreCollections
28from flipdare.service._service_provider import ServiceProvider
29from flipdare.service.processor.compliance_processor import AnonymizeOptions, ComplianceProcessor
30from flipdare.wrapper import (
31 AppJobWrapper,
32 ChatWrapper,
33 ContentWrapper,
34 DareWrapper,
35 PersistedWrapper,
36 PledgeWrapper,
37 UserWrapper,
38)
40if TYPE_CHECKING:
41 from flipdare.manager.db_manager import DbManager
42 from flipdare.manager.backend_manager import BackendManager
44__all__ = ["AnonymizeOptions", "ComplianceService"]
46_JT = AppJobType
47_COL = FirestoreCollections.COMPLIANCE
50class ComplianceService(ServiceProvider):
51 def __init__(
52 self,
53 db_manager: DbManager | None = None,
54 backend_manager: BackendManager | None = None,
55 tokenize: Tokenizer | None = None,
56 processor: ComplianceProcessor | None = None,
57 ) -> None:
58 super().__init__(
59 backend_manager=backend_manager,
60 db_manager=db_manager,
61 )
62 self.tokenizer = tokenize or Tokenizer.instance()
63 self._processor = processor
65 @property
66 def processor(self) -> ComplianceProcessor:
67 if self._processor is None:
68 self._processor = ComplianceProcessor(
69 compliance_bridge=self.compliance_bridge,
70 indexer_service=self.indexer,
71 tokenizer=self.tokenizer,
72 )
73 return self._processor
75 # ========================================================================
76 # Triggers
77 # ========================================================================
79 @job_type_decorator(_JT.TR_USER_ANONYMIZE)
80 @trigger_decorator(job_type=_JT.TR_USER_ANONYMIZE, collection=_COL, wrapper_class=UserWrapper)
81 def trigger_user_anonymize(
82 self,
83 job: AppJobWrapper,
84 *,
85 wrapper: UserWrapper,
86 ) -> JobResult[UserWrapper]:
87 # Users are anonymized instead of deleted to preserve referential integrity.
88 return self._run_anonymize(job=job, wrapper=wrapper)
90 @job_type_decorator(_JT.TR_CONTENT_DELETE)
91 @trigger_decorator(
92 job_type=_JT.TR_CONTENT_DELETE, collection=_COL, wrapper_class=ContentWrapper
93 )
94 def trigger_content_delete(
95 self,
96 job: AppJobWrapper,
97 *,
98 wrapper: ContentWrapper,
99 ) -> JobResult[ContentWrapper]:
100 return self._run_delete(
101 job=job,
102 wrapper=wrapper,
103 delete_handler=self.content_bridge.delete,
104 delete_search_index=True,
105 )
107 @job_type_decorator(_JT.TR_CHAT_DELETE)
108 @trigger_decorator(job_type=_JT.TR_CHAT_DELETE, collection=_COL, wrapper_class=ChatWrapper)
109 def trigger_chat_delete(
110 self,
111 job: AppJobWrapper,
112 *,
113 wrapper: ChatWrapper,
114 ) -> JobResult[ChatWrapper]:
115 return self._run_delete(
116 job=job,
117 wrapper=wrapper,
118 delete_handler=self.chat_bridge.delete,
119 delete_search_index=False,
120 )
122 @job_type_decorator(_JT.TR_DARE_DELETE)
123 @trigger_decorator(job_type=_JT.TR_DARE_DELETE, collection=_COL, wrapper_class=DareWrapper)
124 def trigger_dare_delete(
125 self,
126 job: AppJobWrapper,
127 *,
128 wrapper: DareWrapper,
129 ) -> JobResult[DareWrapper]:
130 return self._run_delete(
131 job=job,
132 wrapper=wrapper,
133 delete_handler=self.dare_bridge.delete,
134 delete_search_index=True,
135 )
137 @job_type_decorator(_JT.TR_PLEDGE_DELETE)
138 @trigger_decorator(job_type=_JT.TR_PLEDGE_DELETE, collection=_COL, wrapper_class=PledgeWrapper)
139 def trigger_pledge_delete(
140 self,
141 job: AppJobWrapper,
142 *,
143 wrapper: PledgeWrapper,
144 ) -> JobResult[PledgeWrapper]:
145 return self._run_delete(
146 job=job,
147 wrapper=wrapper,
148 delete_handler=self.pledge_bridge.delete,
149 delete_search_index=False,
150 )
152 # ========================================================================
153 # Core
154 # ========================================================================
156 def _run_delete(
157 self,
158 job: AppJobWrapper,
159 wrapper: PersistedWrapper[Any],
160 delete_handler: Callable[[str], AppResult[None]],
161 delete_search_index: bool,
162 ) -> JobResult[Any]:
163 doc_id = job.doc_id
164 debug_label = f"{type(wrapper).__name__} {doc_id}"
166 compliance, error_output = self._create_compliance_record(job, wrapper, debug_label)
167 if error_output is not None:
168 return error_output
170 assert compliance
171 result = self.processor.process_delete(
172 compliance=compliance,
173 delete_handler=delete_handler,
174 delete_search_index=delete_search_index,
175 )
176 return self._build_output(job, result, debug_label)
178 def _run_anonymize(
179 self,
180 job: AppJobWrapper,
181 wrapper: UserWrapper,
182 ) -> JobResult[UserWrapper]:
183 doc_id = job.doc_id
184 debug_label = f"User {doc_id}"
186 compliance, error_output = self._create_compliance_record(job, wrapper, debug_label)
187 if error_output is not None:
188 return error_output
190 assert compliance
191 result = self.processor.process_anonymize(
192 compliance=compliance,
193 user=wrapper,
194 update_handler=self.user_bridge.update,
195 )
196 return self._build_output(job, result, debug_label)
198 def _create_compliance_record(
199 self,
200 job: AppJobWrapper,
201 wrapper: PersistedWrapper[Any],
202 debug_label: str,
203 ) -> tuple[Any, JobResult[Any] | None]:
204 """
205 Create the compliance archive record.
206 Returns (compliance_wrapper, None) on success, or (None, error_output) on failure.
207 """
208 create_result = self.compliance_bridge.create(ComplianceModel.from_wrapper(wrapper))
209 if create_result.is_error:
210 msg = f"Failed to create compliance record for {debug_label}"
211 LOG().error(f"{msg}\n{create_result.formatted}", include_stack=True)
212 output = JobResult[Any].ok(doc_id=job.doc_id, job_type=job.job_type, collection=_COL)
213 output.set_error(
214 app_result=create_result,
215 error_code=AppErrorCode.COMP_CREATE_FAILED,
216 message=msg,
217 )
218 return None, output
220 return create_result.generated, None
222 def _build_output(
223 self,
224 job: AppJobWrapper,
225 result: AppResult[Any],
226 debug_label: str,
227 ) -> JobResult[Any]:
228 output = JobResult[Any].ok(doc_id=job.doc_id, job_type=job.job_type, collection=_COL)
229 if result.is_error:
230 msg = f"Compliance processing failed for {debug_label}\n{result.formatted}"
231 output.set_error(
232 app_result=result,
233 error_code=AppErrorCode.COMP_PROCESSING_FAILED,
234 message=msg,
235 )
236 else:
237 msg = f"Successfully processed compliance for {debug_label}"
238 if IS_DEBUG:
239 LOG().debug(msg)
240 output.set_ok(message=msg)
241 return output