Coverage for functions \ flipdare \ service \ dare_service.py: 17%

275 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12from __future__ import annotations 

13from pathlib import Path 

14from typing import TYPE_CHECKING 

15from flipdare.app_log import LOG 

16from flipdare.constants import DOWNLOAD_FILE_DIR, IS_DEBUG, IS_TRACE 

17from flipdare.core.cron_decorator import cron_decorator 

18from flipdare.core.job_type_decorator import job_type_decorator 

19from flipdare.core.trigger_decorator import trigger_decorator 

20from flipdare.message.error_message import ErrorMessage 

21from flipdare.error.app_error import AppError 

22from flipdare.error.error_context import ErrorContext 

23from flipdare.result import JobResult, AppResult 

24from flipdare.firestore.context.dare_context import DareContext, DareContextFactory 

25from flipdare.voting.ballot import BallotOutcome 

26from flipdare.wrapper import AppJobWrapper, DareWrapper 

27from flipdare.app_types import CronResult 

28from flipdare.util.time_util import TimeUtil 

29from flipdare.voting.ballot_manager import BallotManager, get_ballot_manager 

30from flipdare.generated import ( 

31 StopwatchModel, 

32 BallotAlgorithmType, 

33 BallotResult, 

34 DareStatus, 

35 AppErrorCode, 

36 AppJobType, 

37 FirestoreCollections, 

38 CountMetric, 

39) 

40from flipdare.service import ( 

41 ErrorMixin, 

42 EmailMixin, 

43 ServiceProvider, 

44 DareEmailProcessor, 

45 DareProcessor, 

46 CronConfig, 

47 CronProcessor, 

48) 

49 

50if TYPE_CHECKING: 

51 from flipdare.manager.db_manager import DbManager 

52 from flipdare.manager.backend_manager import BackendManager 

53 

54__all__ = ["DareService"] 

55 

56 

57_JT = AppJobType 

58_COL = FirestoreCollections.DARE 

59 

60 

61class DareService(EmailMixin, ErrorMixin, ServiceProvider): 

62 __slots__ = ["_dare_processor"] 

63 

64 def __init__( 

65 self, 

66 db_manager: DbManager | None = None, 

67 backend_manager: BackendManager | None = None, 

68 local_path: Path = DOWNLOAD_FILE_DIR, 

69 ) -> None: 

70 super().__init__( 

71 backend_manager=backend_manager, 

72 db_manager=db_manager, 

73 ) 

74 self._local_path = local_path 

75 self._dare_processor: DareProcessor | None = None 

76 self._email_processor: DareEmailProcessor | None = None 

77 self._ballot_manager: BallotManager | None = None 

78 

79 @property 

80 def ballot_manager(self) -> BallotManager: 

81 if self._ballot_manager is None: 

82 self._ballot_manager = get_ballot_manager() 

83 return self._ballot_manager 

84 

85 @property 

86 def dare_processor(self) -> DareProcessor: 

87 if self._dare_processor is None: 

88 self._dare_processor = DareProcessor( 

89 bucket=self.storage_bucket, 

90 dare_bridge=self.dare_bridge, 

91 indexer=self.indexer, 

92 summary_service=self.service_manager.summary, 

93 local_path=self._local_path, 

94 ) 

95 return self._dare_processor 

96 

97 @property 

98 def email_processor(self) -> DareEmailProcessor: 

99 if self._email_processor is None: 

100 self._email_processor = DareEmailProcessor( 

101 db_manager=self.db_manager, 

102 app_logger=self.app_logger, 

103 user_mailer=self.user_mailer, 

104 admin_mailer=self.admin_mailer, 

105 ) 

106 return self._email_processor 

107 

108 # ======================================================================== 

109 # CRONS 

110 # ======================================================================== 

111 

112 @cron_decorator(job_type=_JT.CR_DARE_UNPROCESSED) 

113 def cron_dare_unprocessed(self) -> CronResult: 

114 """Process unprocessed dares from last week.""" 

115 config = CronConfig( 

116 job_type=_JT.CR_DARE_UNPROCESSED, 

117 job_name="cron_dare_unprocessed", 

118 query_fn=lambda: self.dare_db.get_recent_unprocessed(), 

119 process_fn=lambda dare: self.dare_processor.process_dare(dare), 

120 ) 

121 return CronProcessor(config).process_result() 

122 

123 @cron_decorator(job_type=_JT.CR_DARE_VOTE) 

124 def cron_vote(self) -> CronResult: 

125 dares = self.dare_db.get_dares_can_vote() 

126 if len(dares) == 0: 

127 msg = "No dares available for voting." 

128 LOG().info(msg) 

129 return CountMetric.empty() 

130 

131 started_at = TimeUtil.get_current_utc_dt() 

132 success_ct = 0 

133 failed_ct = 0 

134 skipped_ct = 0 

135 

136 main_result = AppResult[DareWrapper](task_name="cron_vote") 

137 for dare in dares: 

138 dare_id = dare.doc_id 

139 if IS_DEBUG: 

140 LOG().debug(f"Processing dare {dare_id} for voting results.") 

141 

142 voting_timer = dare.voting_timer 

143 if voting_timer is None: 

144 if IS_DEBUG: 

145 LOG().debug(f"Dare {dare_id} has no voting timer, starting..") 

146 dare.start_vote_timer() 

147 voting_timer = dare.voting_timer 

148 assert voting_timer is not None # narrowing 

149 

150 try: 

151 ballot_outcome = self._check_dare_votes(dare, voting_timer) 

152 if ballot_outcome is None: 

153 skipped_ct += 1 

154 continue 

155 

156 # start email should have been sent during the completed dare process. 

157 # we only need to send the result email, 

158 # which has the voting result and the dare status 

159 # (e.g. whether the dare was successful or not). 

160 context = self._get_dare_context(dare) 

161 

162 self.email_processor.notify_user_vote_complete( 

163 context=context, 

164 ballot_result=ballot_outcome.result, 

165 ) 

166 

167 dare.set_vote_result(ballot_outcome) 

168 self._update_dare(dare) 

169 success_ct += 1 

170 

171 if IS_DEBUG: 

172 msg = f"Dare {dare_id} voting_outcome={ballot_outcome.result}, dare_status={dare.status}." 

173 LOG().debug(msg) 

174 

175 except Exception as e: 

176 msg = f"Error processing dare {dare_id}: {e}" 

177 LOG().error(msg) 

178 main_result.add_error(AppErrorCode.VOTING, msg) 

179 failed_ct += 1 

180 

181 ended_at = TimeUtil.get_current_utc_dt() 

182 duration = TimeUtil.duration_in_seconds(started_at, ended_at) 

183 

184 debug_msg = f"{success_ct} success, {failed_ct} failed, {skipped_ct} skipped, duration={duration} secs." 

185 

186 if not main_result.is_error: 

187 LOG().info(f"Dare voting cron completed: {debug_msg}") 

188 return CountMetric( 

189 success_ct=success_ct, 

190 failed_ct=failed_ct, 

191 skipped_ct=skipped_ct, 

192 duration=duration, 

193 ) 

194 

195 # handle the error 

196 msg = f"Errors during dare voting processing:{debug_msg}" 

197 self.cron_result_error( 

198 job_type=AppJobType.CR_DARE_VOTE, 

199 result=main_result, 

200 collection=FirestoreCollections.DARE, 

201 notify_admin=True, 

202 error_code=AppErrorCode.VOTING, 

203 message=msg, 

204 ) 

205 return CountMetric.error(duration=duration) 

206 

207 # ======================================================================== 

208 # TRIGGERS - Delegate to processors 

209 # ======================================================================== 

210 

211 # !! IMPORTANT !! # !! IMPORTANT !! # !! IMPORTANT !! # !! IMPORTANT !! 

212 # NOTE: this is the most important trigger in the system. 

213 # NOTE: its responsible for: 

214 # NOTE: 1. Notifying users when dare complete. 

215 # NOTE: 2. initiating voting 

216 # NOTE: 3. initiating payment processing. 

217 # NOTE: 4. notifying users of status. 

218 

219 @job_type_decorator(_JT.TR_DARE) 

220 @trigger_decorator(job_type=_JT.TR_DARE, collection=_COL, wrapper_class=DareWrapper) 

221 def trigger_dare( 

222 self, 

223 job: AppJobWrapper, 

224 *, 

225 dare: DareWrapper, 

226 ) -> JobResult[DareWrapper]: 

227 """ 

228 Main dare trigger. 

229 """ 

230 status = dare.status 

231 job_id = job.doc_id 

232 

233 if status.is_complete: 

234 # this eliminates COMPLETED, PAY_OUT, and FINALIZED statuses, which should not be processed. 

235 msg = f"Dare {dare.doc_id} is complete, no further processing needed." 

236 if IS_DEBUG: 

237 LOG().debug(msg) 

238 return JobResult.skip_doc(job_id, message=msg) 

239 

240 if dare.has_voting_started and not status.is_completing: 

241 # log, this should not be possible. 

242 # NOTE: this should be prevented in the gui. 

243 # NOTE: if a dare is voting, the user should not be allowed to change the state. 

244 # NOTE: until voting has timed out. 

245 msg = f"Dare {dare.doc_id} has voting timer started but status is {status}. This should not be possible." 

246 LOG().error(msg) 

247 return JobResult.skip_doc(job_id, message=msg) 

248 

249 match status: 

250 case DareStatus.DRAFT: 

251 # do nothing, dare has not been submitted for processing yet 

252 msg = "Dare is in DRAFT status, skipping processing." 

253 if IS_TRACE: 

254 LOG().trace(msg) 

255 

256 return JobResult.ok(message=msg) 

257 case DareStatus.SUBMITTED | DareStatus.RESUBMITTED | DareStatus.REVIEW_REQUIRED: 

258 # moderate the dare first, which will then trigger processing if it passes moderation 

259 return self._moderate_dare(job_id=job_id, dare=dare) 

260 case DareStatus.COMPLETED: 

261 # process the completed dare, which will set it to voting and send out notifications. 

262 return self._process_completed_dare(job_id=job_id, dare=dare) 

263 case _: 

264 # for all other statuses, we just need to re-index the dare since it may have been updated. 

265 # this is not that costly, since dares are updated infrequently... 

266 return self._reindex_dare(job_id=job_id, dare=dare) 

267 

268 def _moderate_dare( 

269 self, 

270 job_id: str, 

271 *, 

272 dare: DareWrapper, 

273 ) -> JobResult[DareWrapper]: 

274 """ 

275 Initial safety check on a new dare submission. 

276 wrapper is injected by trigger_decorator after deserializing job.model_data. 

277 """ 

278 dare_id = dare.doc_id 

279 main_result = AppResult[DareWrapper](doc_id=dare_id) 

280 

281 try: 

282 outcome = self.service_manager.moderation.review_dare(dare) 

283 if outcome is None or outcome.is_approved: 

284 if IS_DEBUG: 

285 msg = f"Dare {dare_id} passed pre-flight moderation with outcome: {outcome}" 

286 LOG().debug(msg) 

287 return self._process_created_dare(job_id, dare) 

288 

289 cause = f"Dare {dare_id} not approved in pre-flight moderation {outcome}" 

290 # NOTE: picked up by a scheduled task to notify the user. 

291 self._add_job(dare_id, AppJobType.TR_DARE, dare) 

292 

293 main_result.add_error(AppErrorCode.MODERATION, cause) 

294 return JobResult.from_result(main_result, doc_id=dare_id, data=dare.to_dict()) 

295 

296 except Exception as e: 

297 cause = f"Error processing new dare {dare_id}: {e}" 

298 LOG().error(cause) 

299 main_result.add_error(AppErrorCode.MODERATION, cause) 

300 return JobResult.from_result(main_result, doc_id=dare_id, data=dare.to_dict()) 

301 

302 def _process_created_dare( 

303 self, 

304 job_id: str, 

305 dare: DareWrapper, 

306 ) -> JobResult[DareWrapper]: 

307 debug_label = f"Dare Creation Trigger: {job_id}" 

308 main_result = AppResult[DareWrapper](task_name=debug_label, doc_id=job_id) 

309 dare_data = dare.to_dict() 

310 

311 try: 

312 process_result = self.dare_processor.process_dare(dare=dare) 

313 if process_result.is_error: 

314 msg = f"{debug_label} - {process_result.message or 'Processing error'}" 

315 error_code = process_result.error_code or AppErrorCode.DARE_PROCESSING 

316 main_result.add_error(error_code, msg) 

317 except Exception as e: 

318 msg = f"{debug_label} - Processing exception: {e}" 

319 main_result.add_error(AppErrorCode.DARE_PROCESSING, msg) 

320 

321 if not main_result.is_error: 

322 return JobResult.ok(message=f"{debug_label} processed successfully.") 

323 

324 # handle the error 

325 if IS_DEBUG: 

326 LOG().debug(f"{debug_label} - Errors during processing: {main_result.errors}") 

327 

328 return JobResult.from_result(main_result, data=dare_data) 

329 

330 def _process_completed_dare( 

331 self, 

332 job_id: str, 

333 *, 

334 dare: DareWrapper, 

335 ) -> JobResult[DareWrapper]: 

336 # the dare is completed, we need to : 

337 # 1. start the vote timer 

338 # 2. send out email notifications to the owner and pledgers that voting has begun. 

339 # 3. update user gui activity with notification that voting has begun. 

340 

341 # NOTE: crons handle changing the vote status and sending 

342 # NOTE: vote completion emails/notifications. 

343 

344 debug_label = f"Dare Creation Trigger: {job_id}" 

345 main_result = AppResult[DareWrapper](task_name=debug_label, doc_id=job_id) 

346 dare_data = dare.to_dict() 

347 

348 try: 

349 process_result = self.dare_processor.process_dare(dare=dare) 

350 if process_result.is_error: 

351 error_code = process_result.error_code or AppErrorCode.DARE_PROCESSING 

352 main_result.add_error(error_code, process_result.message or "Processing error") 

353 

354 except Exception as e: 

355 msg = f"{debug_label} - Processing exception: {e}" 

356 main_result.add_error(AppErrorCode.DARE_PROCESSING, msg) 

357 

358 if main_result.is_error: 

359 if IS_DEBUG: 

360 LOG().debug(f"{debug_label} - Errors during processing: {main_result.errors}") 

361 

362 return JobResult.from_result(main_result, data=dare_data) 

363 

364 # set the dare to voting and send out a notification that voting has begun. 

365 # NOTE: This is where VOTING begins. 

366 context: DareContext | None = None 

367 try: 

368 context = self._get_dare_context(dare) 

369 except Exception as e: 

370 msg = f"{debug_label} - Error getting dare context: {e}" 

371 LOG().error(msg) 

372 main_result.add_error(AppErrorCode.INVALID_CONTEXT, msg) 

373 return JobResult.from_result(main_result, data=dare_data) 

374 

375 try: 

376 dare.start_vote_timer() 

377 self._update_dare(dare) 

378 self.email_processor.notify_user_vote_start(context=context) 

379 return JobResult.ok(message=f"{debug_label} processed successfully.") 

380 except Exception as err: 

381 msg = f"{debug_label} - Error updating dare status to VOTING: {err}" 

382 LOG().error(msg) 

383 error = ErrorContext.from_exception( 

384 endpoint="_process_completed_dare", 

385 error_code=AppErrorCode.UPDATE_FAILED, 

386 message=msg, 

387 error=err, 

388 ) 

389 raise AppError.from_context(error) from err 

390 

391 # ======================================================================== 

392 # Method HELPERS 

393 # ======================================================================== 

394 

395 def _check_dare_votes(self, dare: DareWrapper, timer: StopwatchModel) -> BallotOutcome | None: 

396 dare_id = dare.doc_id 

397 is_expired = timer.is_expired 

398 

399 if IS_DEBUG: 

400 msg = f"Getting votes for dare {dare_id} with timer {timer}. Expired={is_expired}" 

401 LOG().debug(msg) 

402 

403 outcome = self.ballot_manager.count_votes(dare) 

404 if outcome is None: 

405 if not is_expired: 

406 if IS_DEBUG: 

407 LOG().debug(f"Not enough votes cast for dare {dare_id}. Skipping.") 

408 return None 

409 

410 # expired 

411 if IS_DEBUG: 

412 LOG().debug(f"TIMED OUT and not enough votes cast for dare {dare_id}. Rejected.") 

413 

414 return BallotOutcome( 

415 result=BallotResult.REJECTED, 

416 algorithm=BallotAlgorithmType.VOTE_TOTALS, 

417 ) 

418 

419 # if we get to here we have an outcome 

420 

421 if not outcome.result.is_finalized: 

422 if IS_DEBUG: 

423 LOG().debug(f"Votes cast for dare {dare_id} but no decision reached. Skipping.") 

424 return None 

425 

426 if IS_DEBUG: 

427 msg = ( 

428 f"Votes cast for dare {dare_id} with outcome {outcome.result}. " 

429 f"Finalized={outcome.result.is_finalized}" 

430 ) 

431 LOG().debug(msg) 

432 

433 return outcome 

434 

435 # ======================================================================== 

436 # Generic HELPERS 

437 # ======================================================================== 

438 

439 def _add_job( 

440 self, 

441 doc_id: str, 

442 job_type: AppJobType, 

443 model: DareWrapper, 

444 ) -> None: 

445 self.job_logger.create_new(doc_id=doc_id, job_type=job_type, model=model.to_dict()) 

446 if IS_TRACE: 

447 LOG().trace(f"Job {job_type} created for dare ID: {doc_id}") 

448 

449 def _get_dare_context(self, dare: DareWrapper) -> DareContext: 

450 ctx = DareContextFactory().create(dare) 

451 dare_id = dare.doc_id 

452 if ctx is None: 

453 msg = f"Could not create dare context for dare {dare_id}." 

454 LOG().error(msg) 

455 raise AppError.from_context( 

456 ErrorContext( 

457 endpoint="_get_dare_context", 

458 error_code=AppErrorCode.INVALID_CONTEXT, 

459 message=ErrorMessage.INVALID_CONTEXT.formatted({"OBJECT": "Dare"}), 

460 cause=msg, 

461 ), 

462 ) 

463 

464 if not ctx.validate(): 

465 msg = f"Invalid dare context for dare {dare_id}." 

466 raise AppError.from_context( 

467 ErrorContext( 

468 endpoint="_get_dare_context", 

469 error_code=AppErrorCode.INVALID_CONTEXT, 

470 message=ErrorMessage.INVALID_CONTEXT.formatted({"OBJECT": "Dare"}), 

471 cause=msg, 

472 ), 

473 ) 

474 

475 return ctx 

476 

477 def _update_dare(self, dare: DareWrapper) -> None: 

478 try: 

479 updates = dare.get_updates() 

480 if not updates: 

481 if IS_DEBUG: 

482 LOG().debug(f"No updates to apply for dare {dare.doc_id}.") 

483 return 

484 

485 self.dare_db.update(dare.doc_id, updates) 

486 if IS_TRACE: 

487 LOG().trace(f"Updated dare {dare.doc_id} with updates: {updates}") 

488 

489 except Exception as err: 

490 msg = f"Error updating dare {dare.doc_id}: {err}" 

491 LOG().error(msg) 

492 error = ErrorContext.from_exception( 

493 endpoint="_update_dare", 

494 error_code=AppErrorCode.UPDATE_FAILED, 

495 message=msg, 

496 error=err, 

497 ) 

498 raise AppError.from_context(error) from err 

499 

500 def _reindex_dare( 

501 self, 

502 job_id: str, 

503 *, 

504 dare: DareWrapper, 

505 ) -> JobResult[DareWrapper]: 

506 dare_id = dare.doc_id 

507 main_result = AppResult[DareWrapper](doc_id=dare_id) 

508 

509 if IS_DEBUG: 

510 msg = f"Reindexing dare {dare_id} with job ID {job_id}." 

511 LOG().debug(msg) 

512 

513 try: 

514 index_result = self.indexer.process_dare(dare, is_update=True) 

515 if index_result.is_error: 

516 msg = f"Error reindexing dare {dare_id}/{job_id}: {index_result.errors}" 

517 LOG().error(msg) 

518 main_result.merge(index_result) 

519 

520 except Exception as e: 

521 msg = f"Exception reindexing dare {dare_id}/{job_id}: {e}" 

522 LOG().error(msg) 

523 main_result.add_error(AppErrorCode.INDEX_FAILED, msg) 

524 

525 if not main_result.is_error: 

526 msg = f"Dare {dare_id}/{job_id} reindexed successfully." 

527 if IS_TRACE: 

528 LOG().trace(msg) 

529 

530 return JobResult.ok(message=msg) 

531 

532 # error 

533 if IS_DEBUG: 

534 msg = f"Errors reindexing dare {dare_id}/{job_id}: {main_result.errors}" 

535 LOG().debug(msg) 

536 

537 return JobResult.from_result( 

538 main_result, 

539 doc_id=dare_id, 

540 data=dare.to_dict(), 

541 )