Coverage for functions \ flipdare \ service \ safety \ moderation_service.py: 60%
174 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from typing import TYPE_CHECKING
17from google.cloud import language_v1
18from google.cloud.language_v1 import ClassificationModelOptions
19from flipdare.app_log import LOG
20from flipdare.constants import IS_DEBUG
21from flipdare.generated.shared.backend.app_job_type import AppJobType
22from flipdare.generated.shared.model.dare.dare_status import DareStatus
23from flipdare.generated.shared.model.restriction.moderation_decision import ModerationDecision
25from flipdare.manager.db_manager import DbManager
27from flipdare.service._service_provider import ServiceProvider
28from flipdare.service.safety.core.moderation_scorer import ModerationScorer
29from flipdare.service.safety.safety_types import (
30 ModerationCategory,
31 ReputationOutcome,
32 ModerationAssessment,
33 ModerationOutcome,
34)
35from flipdare.generated.shared.firestore_collections import FirestoreCollections
36from flipdare.wrapper import DareWrapper, PersistedGuard
37from flipdare.wrapper.chat_comment_wrapper import ChatCommentWrapper
39if TYPE_CHECKING:
40 from flipdare.manager.service_manager import ServiceManager
41 from flipdare.manager.backend_manager import BackendManager
44_D = ModerationDecision
45_CategoriesVersion = ClassificationModelOptions.V2Model.ContentCategoriesVersion
48class ModerationService(ServiceProvider):
49 """A service for analyzing sentiment using Google Cloud Natural Language API."""
51 def __init__(
52 self,
53 client: language_v1.LanguageServiceClient | None = None,
54 db_manager: DbManager | None = None,
55 backend_manager: BackendManager | None = None,
56 service_manager: ServiceManager | None = None,
57 ) -> None:
58 super().__init__(
59 db_manager=db_manager,
60 backend_manager=backend_manager,
61 service_manager=service_manager,
62 )
63 self._client = client
64 self._content_categories_version = _CategoriesVersion.V2
66 @property
67 def language_client(self) -> language_v1.LanguageServiceClient:
68 if self._client is None:
69 self._client = language_v1.LanguageServiceClient()
70 return self._client
72 @language_client.setter
73 def language_client(self, value: language_v1.LanguageServiceClient) -> None:
74 self._client = value
76 @property
77 def content_categories_version(self) -> _CategoriesVersion:
78 return self._content_categories_version
80 @content_categories_version.setter
81 def content_categories_version(self, value: _CategoriesVersion) -> None:
82 self._content_categories_version = value
84 def review_comment(self, comment: ChatCommentWrapper) -> ModerationOutcome:
85 reputation_controller = self.reputation_service
87 rep_outcome: ReputationOutcome | None = None
89 from_uid = comment.from_uid
90 rep_outcome = reputation_controller.should_analyze(from_uid)
91 if not rep_outcome.should_analyze:
92 if IS_DEBUG:
93 LOG().debug(f"Auto-Approving for user {from_uid} due to high reputation.")
94 return ModerationOutcome(
95 decision=_D.AUTO_APPROVE_REPUTATION,
96 new_reputation=rep_outcome.new_reputation,
97 )
99 message = comment.message
100 if message is None:
101 if IS_DEBUG:
102 LOG().debug(f"No message found in comment {comment.doc_id}, auto-approving.")
103 return ModerationOutcome(
104 decision=_D.AUTO_APPROVE_SENTIMENT,
105 new_reputation=rep_outcome.new_reputation,
106 )
108 assessment = self._moderate_text(message)
109 decision: _D | None = None
111 if assessment is None:
112 if IS_DEBUG:
113 LOG().debug(f"No assessment, auto-approved for comment: '{message}'")
114 return ModerationOutcome(
115 decision=_D.AUTO_APPROVE_SENTIMENT,
116 new_reputation=rep_outcome.new_reputation,
117 )
119 if IS_DEBUG:
120 LOG().debug(f"Moderation decision: {assessment} for comment: '{message}'")
121 moderation_type = assessment.moderation_type
122 rep_outcome = reputation_controller.confirm_review(from_uid, moderation_type)
124 if not rep_outcome.should_analyze:
125 if IS_DEBUG:
126 LOG().debug(f"Review Not Required: result: {rep_outcome} for comment: '{message}'")
127 decision = _D.AUTO_APPROVE_SENTIMENT
128 else:
129 if IS_DEBUG:
130 LOG().debug(f"Review REQUIRED: result: {rep_outcome} for comment: '{message}'")
131 decision = _D.REVIEW_REQUIRED
133 if IS_DEBUG:
134 LOG().debug(f"Comment approval result: {decision}")
136 return ModerationOutcome(
137 decision=decision,
138 new_reputation=rep_outcome.new_reputation,
139 assessment=assessment,
140 )
142 def review_dare( # noqa: PLR0912, PLR0915
143 self,
144 dare_obj: DareWrapper | str,
145 ) -> ModerationOutcome | None:
146 """
147 Analyze the sentiment of a dare using an external sentiment analysis service.
149 Args:
150 dare (DareWrapper | str): The dare model or dare ID to analyze.
152 Returns:
153 SentimentResult: The result of the sentiment analysis.
155 """
156 controller = self.reputation_service
157 dare_db = self.dare_db
159 dare_model: DareWrapper | None = None
160 dare_id: str | None = None
162 if PersistedGuard.is_dare(dare_obj):
163 dare_model = dare_obj
164 dare_id = dare_obj.doc_id
165 elif isinstance(dare_obj, str):
166 dare_id = dare_obj
167 dare_model = dare_db.get(dare_id)
169 if dare_model is None:
170 LOG().error(f"Dare not found for moderation: {dare_id}")
171 return None
172 if dare_id is None:
173 LOG().error(f"Dare model provided has no ID:\nData={dare_model.to_dict()}")
174 return None
176 from_uid = dare_model.from_uid
177 old_decision = dare_model.moderation_decision
178 status = dare_model.status
180 if not status.requires_moderation:
181 if IS_DEBUG:
182 msg = f"Dare {dare_id} in status {status} does not require moderation; skipping."
183 LOG().debug(msg)
184 return None
185 if old_decision is not None:
186 if IS_DEBUG:
187 msg = f"Dare {dare_id} already reviewed ({old_decision}), skipping moderation."
188 LOG().debug(msg)
189 return ModerationOutcome(decision=old_decision, new_reputation=-1)
191 if dare_model.status == DareStatus.DRAFT:
192 if IS_DEBUG:
193 LOG().debug(f"Dare {dare_id} is in DRAFT status; skipping moderation.")
194 return None
196 content = f"{dare_model.title}\n{dare_model.message}"
198 decision: _D | None = None
199 assessment: ModerationAssessment | None = None
200 rep_outcome: ReputationOutcome | None = None
202 rep_outcome = controller.should_analyze(from_uid)
204 if not rep_outcome.should_analyze:
205 if IS_DEBUG:
206 LOG().debug(f"Auto-Approving for user {from_uid} due to high reputation.")
207 decision = _D.AUTO_APPROVE_REPUTATION
208 else:
209 debug_msg = f"dare {dare_id} from user {from_uid} with content '{content}'"
210 assessment = self._moderate_text(content)
211 if assessment is None:
212 if IS_DEBUG:
213 LOG().debug(f"No result, auto-approved: {debug_msg}")
214 decision = _D.AUTO_APPROVE_SENTIMENT
215 else:
216 if IS_DEBUG:
217 LOG().debug(f"Moderation assessment: {assessment}: {debug_msg}")
218 moderation_type = assessment.moderation_type
219 rep_outcome = controller.confirm_review(from_uid, moderation_type)
220 if not rep_outcome.should_analyze:
221 if IS_DEBUG:
222 LOG().debug(f"Review Not Required: assessment: {assessment}: {debug_msg}")
223 decision = _D.AUTO_APPROVE_SENTIMENT
224 else:
225 if IS_DEBUG:
226 LOG().debug(f"Review REQUIRED: assessment: {assessment}: {debug_msg}")
227 decision = _D.REVIEW_REQUIRED
229 if IS_DEBUG:
230 LOG().debug(f"Dare {dare_id} decision: {decision}")
232 ok = self._update_issue_progress(decision, dare_id, dare_model)
233 if not ok:
234 return None
236 return ModerationOutcome(
237 decision=decision,
238 new_reputation=rep_outcome.new_reputation,
239 assessment=assessment,
240 )
242 def _update_issue_progress(
243 self,
244 decision: ModerationDecision,
245 dare_id: str,
246 dare_model: DareWrapper,
247 ) -> bool:
248 dare_db = self.dare_db
249 dare_model.moderation_decision = decision
250 updates = dare_model.get_updates()
251 if not updates:
252 # we cant update manually, because based on the decision
253 # other dare fields may need to be updated
254 # instead the support team needs to investigate ..
255 msg = f"No updates found for dare {dare_id}, failed to updated to {decision}"
256 self.app_logger.unexpected_code_path(
257 job_type=AppJobType.TR_DARE,
258 collection=FirestoreCollections.DARE,
259 message=msg,
260 data={"moderation_decision": decision.value, "id": dare_id},
261 )
262 return False
264 dare_db.update(doc_id=dare_id, updates=updates)
265 LOG().info(f"Dare {dare_id} marked as {decision}.")
266 return True
268 def _moderate_text(self, content: str) -> ModerationAssessment | None:
269 document = language_v1.Document(
270 content=content,
271 type_=language_v1.Document.Type.PLAIN_TEXT,
272 language="en",
273 )
275 req = language_v1.ModerateTextRequest(document=document)
277 response = self.language_client.moderate_text(request=req) # type: ignore
278 confidence_scores = self.parse_moderation_response(response)
279 if len(confidence_scores) <= 0:
280 LOG().warning(f"No moderation categories found in response for content: {content}")
281 return None
283 score = ModerationScorer(confidences=confidence_scores)
284 result = score.get_weighted_result()
286 if IS_DEBUG:
287 LOG().debug(f"Scored {result} for content: {content}")
289 return result
291 @staticmethod
292 def parse_moderation_response(
293 response: language_v1.ModerateTextResponse,
294 ) -> dict[ModerationCategory, float]:
295 category_scores: dict[ModerationCategory, float] = {}
296 for category in response.moderation_categories:
297 try:
298 category_enum = ModerationCategory.from_string(category.name)
299 category_scores[category_enum] = category.confidence
300 if IS_DEBUG:
301 msg = f"Parsed category {category_enum} with confidence {category.confidence}"
302 LOG().debug(msg)
303 except KeyError:
304 LOG().warning(f"Unknown moderation category: {category.name}")
306 return category_scores
308 # NOTE: Using moderation, but this may be necessary in future
309 # def get_sentiment(self, content: str) -> SentimentResult:
310 # document = language_v1.Document(
311 # content=content,
312 # type_=language_v1.Document.Type.PLAIN_TEXT,
313 # language="en"
314 # )
315 #
316 # response = self.client.analyze_sentiment(
317 # request={'document': document}
318 # )
319 #
320 # sentiment = response.document_sentiment
321 #
322 # return SentimentResult(
323 # score=sentiment.score,
324 # magnitude=sentiment.magnitude
325 # )