Coverage for functions \ flipdare \ service \ safety \ core \ moderation_scorer.py: 96%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from numpy import argmax 

15from flipdare.app_log import LOG 

16from flipdare.constants import ( 

17 IS_DEBUG, 

18 IS_TRACE, 

19 MOD_CONFIDENCE_THRESHOLD, 

20 MOD_MIN_AGREEMENT_CATEGORIES, 

21 MOD_SHARPNESS, 

22 MOD_WEIGHTED_FLAGGED_SCORE, 

23 MOD_WEIGHTED_REVIEW_SCORE, 

24) 

25from flipdare.service.safety.safety_types import ( 

26 ModerationAssessment, 

27 ModerationCategory, 

28 ModerationType, 

29) 

30 

31 

32class ModerationScorer: 

33 

34 def __init__( 

35 self, 

36 confidences: dict[ModerationCategory, float], 

37 confidence_threshold: float = MOD_CONFIDENCE_THRESHOLD, 

38 sharpness: float = MOD_SHARPNESS, 

39 min_agreement_categories: int = MOD_MIN_AGREEMENT_CATEGORIES, 

40 weighted_flagged_score: float = MOD_WEIGHTED_FLAGGED_SCORE, 

41 weighted_review_score: float = MOD_WEIGHTED_REVIEW_SCORE, 

42 ) -> None: 

43 category_confidences: dict[ModerationCategory, float] = {} 

44 for category, confidence in confidences.items(): 

45 try: 

46 category_enum = ModerationCategory.from_string(category.value) 

47 category_confidences[category_enum] = confidence 

48 except ValueError: 

49 LOG().warning(f"Unknown moderation category: {category}") 

50 

51 self._category_confidences = category_confidences 

52 self._confidence_threshold = confidence_threshold 

53 self._sharpness = sharpness 

54 self._weighted_flagged_score = weighted_flagged_score 

55 self._weighted_review_score = weighted_review_score 

56 self._min_agreement_categories = min_agreement_categories 

57 

58 @property 

59 def category_confidences(self) -> dict[ModerationCategory, float]: 

60 return self._category_confidences 

61 

62 @property 

63 def confidence_threshold(self) -> float: 

64 return self._confidence_threshold 

65 

66 @property 

67 def sharpness(self) -> float: 

68 return self._sharpness 

69 

70 @property 

71 def weighted_flagged_score(self) -> float: 

72 return self._weighted_flagged_score 

73 

74 @property 

75 def weighted_review_score(self) -> float: 

76 return self._weighted_review_score 

77 

78 @property 

79 def min_agreement_categories(self) -> int: 

80 return self._min_agreement_categories 

81 

82 def get_weighted_result(self) -> ModerationAssessment: 

83 weighted_score, top_category = self.get_weighted_score() 

84 # Lower thresholds for weighted method due to dampening factors 

85 if top_category is None: 

86 LOG().debug("No top category identified, returning SAFE result.") 

87 return ModerationAssessment( 

88 score=weighted_score, 

89 moderation_type=ModerationType.SAFE, 

90 moderation_category=None, 

91 ) 

92 

93 if weighted_score >= self.weighted_flagged_score: 

94 result = ModerationType.FLAGGED 

95 elif weighted_score >= self.weighted_review_score: 

96 result = ModerationType.REVIEW 

97 else: 

98 result = ModerationType.SAFE 

99 

100 if IS_TRACE: 

101 LOG().trace(f"Weighted result: score={weighted_score}, result={result}") 

102 

103 return ModerationAssessment( 

104 score=weighted_score, 

105 moderation_type=result, 

106 moderation_category=top_category, 

107 ) 

108 

109 def get_weighted_score(self) -> tuple[float, ModerationCategory | None]: 

110 """ 

111 Hybrid approach: confidence penalty + ensemble voting 

112 Minimizes false positives while catching true violations 

113 """ 

114 confidence_threshold = self.confidence_threshold 

115 sharpness = self.sharpness 

116 min_agreement_categories = self.min_agreement_categories 

117 

118 # Count high-confidence violations 

119 high_confidence_violations = 0 

120 total_score = 0.0 

121 

122 items = self.category_confidences.items() 

123 if IS_DEBUG: 

124 LOG().debug( 

125 f"Calculating weighted score from category confidences: {len(items)} items" 

126 ) 

127 

128 category_scores: dict[ModerationCategory, float] = {} 

129 

130 for category, confidence in items: 

131 weighting = category.weighting 

132 

133 if weighting == 0.0: 

134 if IS_TRACE: 

135 LOG().trace(f"Skipping category {category} with zero weighting") 

136 continue 

137 

138 # Only count high-confidence predictions 

139 if confidence < confidence_threshold: 

140 if IS_TRACE: 

141 LOG().trace(f"Skipping category {category} with low confidence {confidence}") 

142 continue 

143 

144 category_scores[category] = confidence 

145 high_confidence_violations += 1 

146 # Apply aggressive confidence penalty 

147 confidence_penalty = confidence**sharpness 

148 total_score += confidence_penalty * weighting 

149 

150 # Require multiple categories to agree (reduces false positives) 

151 if high_confidence_violations < min_agreement_categories: 

152 if IS_DEBUG: 

153 LOG().debug( 

154 f"Not enough agreeing categories: " 

155 f"{high_confidence_violations} < {min_agreement_categories}", 

156 ) 

157 return 0.0, None # Not enough agreement 

158 

159 # Apply dampening factor based on number of agreeing categories 

160 agreement_factor = min(1.0, high_confidence_violations / 3.0) 

161 

162 # return the highest scoring category for explainability 

163 top_category = argmax(list(category_scores.values())) if category_scores else None 

164 top_category_enum = ( 

165 list(category_scores.keys())[top_category] if top_category is not None else None 

166 ) 

167 

168 score = total_score * agreement_factor 

169 if IS_DEBUG: 

170 LOG().debug( 

171 f"Weighted score calculation: total_score={total_score}, " 

172 f"high_confidence_violations={high_confidence_violations}, " 

173 f"agreement_factor={agreement_factor}, final_score={score}" 

174 f", top_category={top_category_enum}", 

175 ) 

176 return score, top_category_enum 

177 

178 

179# 

180# OLD METHODS - KEEP FOR REFERENCE 

181# 

182# def get_bayesian_result(self, prior_positive_rate=0.05) -> SentimentResult: 

183# bayes_score = self.get_bayesian_score(prior_positive_rate=prior_positive_rate) 

184# # Higher thresholds for Bayesian posterior probabilities 

185# if bayes_score >= 0.85: 

186# result = ModerationResult.BLOCK 

187# elif bayes_score >= 0.60: 

188# result = ModerationResult.REVIEW 

189# else: 

190# result = ModerationResult.SAFE 

191# return SentimentResult(score=bayes_score, result=result) 

192# def get_bayesian_score(self, prior_positive_rate=0.05) -> float: 

193# """ 

194# Bayesian approach: P(truly harmful | observed scores) 

195# 

196# prior_positive_rate: Base rate of truly harmful content (e.g., 5%) 

197# """ 

198# # Likelihood of observing these scores if content IS harmful 

199# likelihood_positive = 1.0 

200# # Likelihood of observing these scores if content is NOT harmful 

201# likelihood_negative = 1.0 

202# 

203# for category, confidence in self.category_confidences.items(): 

204# weighting = category.weighting 

205# 

206# if weighting == 0.0: 

207# continue 

208# 

209# # Model: harmful content has high confidence, safe content has low 

210# # P(high_confidence | harmful) vs P(high_confidence | safe) 

211# likelihood_positive *= confidence * weighting + (1 - weighting) * 0.1 

212# likelihood_negative *= (1 - confidence * weighting) * 0.9 + 0.1 

213# 

214# # Bayes theorem 

215# prior_negative_rate = 1 - prior_positive_rate 

216# posterior = (likelihood_positive * prior_positive_rate) / \ 

217# (likelihood_positive * prior_positive_rate + 

218# likelihood_negative * prior_negative_rate) 

219# 

220# return posterior 

221# def get_weighted_score_with_confidence_penalty(self, 

222# confidence_threshold=0.5, 

223# sharpness=2.0) -> float: 

224# """ 

225# Penalizes low-confidence predictions to reduce false positives 

226# sharpness: Higher = more aggressive penalty (2.0 is balanced) 

227# """ 

228# total_score = 0.0 

229# for category, confidence in self.category_confidences.items(): 

230# weighting = category.weighting 

231# if weighting == 0.0: 

232# continue 

233# 

234# # Only count confidences above threshold 

235# if confidence < confidence_threshold: 

236# continue 

237# 

238# # Apply confidence penalty: confidence^sharpness 

239# # Low confidence (0.6) gets heavily penalized vs high (0.9) 

240# confidence_penalty = confidence ** sharpness 

241# total_score += confidence_penalty * weighting 

242# return total_score 

243# 

244# def get_geometric_mean_score(self, epsilon=0.01) -> float: 

245# """ 

246# Geometric mean reduces impact of single high-confidence false positives 

247# More conservative than arithmetic mean 

248# """ 

249# product = 1.0 

250# count = 0 

251# 

252# for category, confidence in self.category_confidences.items(): 

253# weighting = category.weighting 

254# 

255# if weighting == 0.0 or confidence < 0.5: 

256# continue 

257# 

258# # Add small epsilon to avoid log(0) 

259# weighted_conf = (confidence * weighting) + epsilon 

260# product *= weighted_conf 

261# count += 1 

262# 

263# if count == 0: 

264# return 0.0 

265# 

266# # Geometric mean: (product)^(1/n) 

267# return product ** (1.0 / count) 

268# 

269# 

270# def get_harmonic_mean_score(self, category_ceiling=0.8) -> float: 

271# """ 

272# Harmonic mean is very conservative - punishes inconsistency 

273# Category ceiling prevents single category from dominating 

274# """ 

275# weighted_reciprocals = 0.0 

276# total_weight = 0.0 

277# 

278# for category, confidence in self.category_confidences.items(): 

279# weighting = category.weighting 

280# 

281# if weighting == 0.0 or confidence < 0.5: 

282# continue 

283# 

284# # Cap individual category contribution 

285# capped_confidence = min(confidence, category_ceiling) 

286# 

287# # Harmonic mean calculation 

288# if capped_confidence > 0: 

289# weighted_reciprocals += weighting / capped_confidence 

290# total_weight += weighting 

291# 

292# if total_weight == 0 or weighted_reciprocals == 0: 

293# return 0.0 

294# 

295# # Harmonic mean: n / (1/x1 + 1/x2 + ... + 1/xn) 

296# return total_weight / weighted_reciprocals 

297# 

298# def get_gated_score(self) -> tuple[float, str]: 

299# """ 

300# Multi-stage gating with reason codes 

301# Returns (score, reason) for explainability 

302# """ 

303# # Stage 1: Check for any high-confidence severe violations 

304# critical_categories = [ 

305# ModerationCategory.DEATH_HARM_TRAGEDY, 

306# ModerationCategory.SEXUAL, 

307# ] 

308# 

309# for category in critical_categories: 

310# confidence = self.category_confidences.get(category, 0.0) 

311# if confidence > 0.85: # Very high bar 

312# return (1.0, f"Critical: {category.value}") 

313# 

314# # Stage 2: Check for multiple medium-confidence violations 

315# medium_violations = [] 

316# for category, confidence in self.category_confidences.items(): 

317# if category.weighting > 0.3 and confidence > 0.70: 

318# medium_violations.append((category, confidence)) 

319# 

320# if len(medium_violations) >= 2: 

321# avg_confidence = sum(c for _, c in medium_violations) / len(medium_violations) 

322# return (avg_confidence * 0.8, f"Multiple violations: {len(medium_violations)}") 

323# 

324# # Stage 3: Aggregated weighted score (conservative) 

325# total_score = 0.0 

326# for category, confidence in self.category_confidences.items(): 

327# if confidence > 0.75: # High confidence only 

328# total_score += confidence ** 2 * category.weighting 

329# 

330# if total_score > 0.5: 

331# return (total_score, "Aggregated score") 

332# 

333# return (0.0, "Clean") 

334# 

335# 

336# def get_ensemble_score(self, 

337# high_severity_threshold=0.75, 

338# medium_severity_threshold=0.60, 

339# min_high_severity_votes=1, 

340# min_medium_severity_votes=2) -> float: 

341# """ 

342# Ensemble voting: requires multiple categories to agree 

343# Dramatically reduces false positives 

344# """ 

345# high_severity_categories = [ 

346# ModerationCategory.DEATH_HARM_TRAGEDY, 

347# ModerationCategory.SEXUAL, 

348# ModerationCategory.VIOLENT, 

349# ] 

350# 

351# high_severity_votes = 0 

352# medium_severity_votes = 0 

353# max_confidence = 0.0 

354# 

355# for category, confidence in self.category_confidences.items(): 

356# weighting = category.weighting 

357# 

358# if weighting == 0.0: 

359# continue 

360# 

361# max_confidence = max(max_confidence, confidence) 

362# 

363# if category in high_severity_categories: 

364# if confidence >= high_severity_threshold: 

365# high_severity_votes += weighting 

366# else: 

367# if confidence >= medium_severity_threshold: 

368# medium_severity_votes += weighting 

369# 

370# # Require consensus from multiple categories 

371# if high_severity_votes >= min_high_severity_votes: 

372# return max_confidence * high_severity_votes 

373# elif medium_severity_votes >= min_medium_severity_votes: 

374# return max_confidence * medium_severity_votes * 0.7 # Discount medium 

375# 

376# return 0.0 # Not enough agreement = not harmful