Coverage for functions \ flipdare \ service \ dare_service.py: 17%
275 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
12from __future__ import annotations
13from pathlib import Path
14from typing import TYPE_CHECKING
15from flipdare.app_log import LOG
16from flipdare.constants import DOWNLOAD_FILE_DIR, IS_DEBUG, IS_TRACE
17from flipdare.core.cron_decorator import cron_decorator
18from flipdare.core.job_type_decorator import job_type_decorator
19from flipdare.core.trigger_decorator import trigger_decorator
20from flipdare.message.error_message import ErrorMessage
21from flipdare.error.app_error import AppError
22from flipdare.error.error_context import ErrorContext
23from flipdare.result import JobResult, AppResult
24from flipdare.firestore.context.dare_context import DareContext, DareContextFactory
25from flipdare.voting.ballot import BallotOutcome
26from flipdare.wrapper import AppJobWrapper, DareWrapper
27from flipdare.app_types import CronResult
28from flipdare.util.time_util import TimeUtil
29from flipdare.voting.ballot_manager import BallotManager, get_ballot_manager
30from flipdare.generated import (
31 StopwatchModel,
32 BallotAlgorithmType,
33 BallotResult,
34 DareStatus,
35 AppErrorCode,
36 AppJobType,
37 FirestoreCollections,
38 CountMetric,
39)
40from flipdare.service import (
41 ErrorMixin,
42 EmailMixin,
43 ServiceProvider,
44 DareEmailProcessor,
45 DareProcessor,
46 CronConfig,
47 CronProcessor,
48)
50if TYPE_CHECKING:
51 from flipdare.manager.db_manager import DbManager
52 from flipdare.manager.backend_manager import BackendManager
54__all__ = ["DareService"]
57_JT = AppJobType
58_COL = FirestoreCollections.DARE
61class DareService(EmailMixin, ErrorMixin, ServiceProvider):
62 __slots__ = ["_dare_processor"]
64 def __init__(
65 self,
66 db_manager: DbManager | None = None,
67 backend_manager: BackendManager | None = None,
68 local_path: Path = DOWNLOAD_FILE_DIR,
69 ) -> None:
70 super().__init__(
71 backend_manager=backend_manager,
72 db_manager=db_manager,
73 )
74 self._local_path = local_path
75 self._dare_processor: DareProcessor | None = None
76 self._email_processor: DareEmailProcessor | None = None
77 self._ballot_manager: BallotManager | None = None
79 @property
80 def ballot_manager(self) -> BallotManager:
81 if self._ballot_manager is None:
82 self._ballot_manager = get_ballot_manager()
83 return self._ballot_manager
85 @property
86 def dare_processor(self) -> DareProcessor:
87 if self._dare_processor is None:
88 self._dare_processor = DareProcessor(
89 bucket=self.storage_bucket,
90 dare_bridge=self.dare_bridge,
91 indexer=self.indexer,
92 summary_service=self.service_manager.summary,
93 local_path=self._local_path,
94 )
95 return self._dare_processor
97 @property
98 def email_processor(self) -> DareEmailProcessor:
99 if self._email_processor is None:
100 self._email_processor = DareEmailProcessor(
101 db_manager=self.db_manager,
102 app_logger=self.app_logger,
103 user_mailer=self.user_mailer,
104 admin_mailer=self.admin_mailer,
105 )
106 return self._email_processor
108 # ========================================================================
109 # CRONS
110 # ========================================================================
112 @cron_decorator(job_type=_JT.CR_DARE_UNPROCESSED)
113 def cron_dare_unprocessed(self) -> CronResult:
114 """Process unprocessed dares from last week."""
115 config = CronConfig(
116 job_type=_JT.CR_DARE_UNPROCESSED,
117 job_name="cron_dare_unprocessed",
118 query_fn=lambda: self.dare_db.get_recent_unprocessed(),
119 process_fn=lambda dare: self.dare_processor.process_dare(dare),
120 )
121 return CronProcessor(config).process_result()
123 @cron_decorator(job_type=_JT.CR_DARE_VOTE)
124 def cron_vote(self) -> CronResult:
125 dares = self.dare_db.get_dares_can_vote()
126 if len(dares) == 0:
127 msg = "No dares available for voting."
128 LOG().info(msg)
129 return CountMetric.empty()
131 started_at = TimeUtil.get_current_utc_dt()
132 success_ct = 0
133 failed_ct = 0
134 skipped_ct = 0
136 main_result = AppResult[DareWrapper](task_name="cron_vote")
137 for dare in dares:
138 dare_id = dare.doc_id
139 if IS_DEBUG:
140 LOG().debug(f"Processing dare {dare_id} for voting results.")
142 voting_timer = dare.voting_timer
143 if voting_timer is None:
144 if IS_DEBUG:
145 LOG().debug(f"Dare {dare_id} has no voting timer, starting..")
146 dare.start_vote_timer()
147 voting_timer = dare.voting_timer
148 assert voting_timer is not None # narrowing
150 try:
151 ballot_outcome = self._check_dare_votes(dare, voting_timer)
152 if ballot_outcome is None:
153 skipped_ct += 1
154 continue
156 # start email should have been sent during the completed dare process.
157 # we only need to send the result email,
158 # which has the voting result and the dare status
159 # (e.g. whether the dare was successful or not).
160 context = self._get_dare_context(dare)
162 self.email_processor.notify_user_vote_complete(
163 context=context,
164 ballot_result=ballot_outcome.result,
165 )
167 dare.set_vote_result(ballot_outcome)
168 self._update_dare(dare)
169 success_ct += 1
171 if IS_DEBUG:
172 msg = f"Dare {dare_id} voting_outcome={ballot_outcome.result}, dare_status={dare.status}."
173 LOG().debug(msg)
175 except Exception as e:
176 msg = f"Error processing dare {dare_id}: {e}"
177 LOG().error(msg)
178 main_result.add_error(AppErrorCode.VOTING, msg)
179 failed_ct += 1
181 ended_at = TimeUtil.get_current_utc_dt()
182 duration = TimeUtil.duration_in_seconds(started_at, ended_at)
184 debug_msg = f"{success_ct} success, {failed_ct} failed, {skipped_ct} skipped, duration={duration} secs."
186 if not main_result.is_error:
187 LOG().info(f"Dare voting cron completed: {debug_msg}")
188 return CountMetric(
189 success_ct=success_ct,
190 failed_ct=failed_ct,
191 skipped_ct=skipped_ct,
192 duration=duration,
193 )
195 # handle the error
196 msg = f"Errors during dare voting processing:{debug_msg}"
197 self.cron_result_error(
198 job_type=AppJobType.CR_DARE_VOTE,
199 result=main_result,
200 collection=FirestoreCollections.DARE,
201 notify_admin=True,
202 error_code=AppErrorCode.VOTING,
203 message=msg,
204 )
205 return CountMetric.error(duration=duration)
207 # ========================================================================
208 # TRIGGERS - Delegate to processors
209 # ========================================================================
211 # !! IMPORTANT !! # !! IMPORTANT !! # !! IMPORTANT !! # !! IMPORTANT !!
212 # NOTE: this is the most important trigger in the system.
213 # NOTE: its responsible for:
214 # NOTE: 1. Notifying users when dare complete.
215 # NOTE: 2. initiating voting
216 # NOTE: 3. initiating payment processing.
217 # NOTE: 4. notifying users of status.
219 @job_type_decorator(_JT.TR_DARE)
220 @trigger_decorator(job_type=_JT.TR_DARE, collection=_COL, wrapper_class=DareWrapper)
221 def trigger_dare(
222 self,
223 job: AppJobWrapper,
224 *,
225 dare: DareWrapper,
226 ) -> JobResult[DareWrapper]:
227 """
228 Main dare trigger.
229 """
230 status = dare.status
231 job_id = job.doc_id
233 if status.is_complete:
234 # this eliminates COMPLETED, PAY_OUT, and FINALIZED statuses, which should not be processed.
235 msg = f"Dare {dare.doc_id} is complete, no further processing needed."
236 if IS_DEBUG:
237 LOG().debug(msg)
238 return JobResult.skip_doc(job_id, message=msg)
240 if dare.has_voting_started and not status.is_completing:
241 # log, this should not be possible.
242 # NOTE: this should be prevented in the gui.
243 # NOTE: if a dare is voting, the user should not be allowed to change the state.
244 # NOTE: until voting has timed out.
245 msg = f"Dare {dare.doc_id} has voting timer started but status is {status}. This should not be possible."
246 LOG().error(msg)
247 return JobResult.skip_doc(job_id, message=msg)
249 match status:
250 case DareStatus.DRAFT:
251 # do nothing, dare has not been submitted for processing yet
252 msg = "Dare is in DRAFT status, skipping processing."
253 if IS_TRACE:
254 LOG().trace(msg)
256 return JobResult.ok(message=msg)
257 case DareStatus.SUBMITTED | DareStatus.RESUBMITTED | DareStatus.REVIEW_REQUIRED:
258 # moderate the dare first, which will then trigger processing if it passes moderation
259 return self._moderate_dare(job_id=job_id, dare=dare)
260 case DareStatus.COMPLETED:
261 # process the completed dare, which will set it to voting and send out notifications.
262 return self._process_completed_dare(job_id=job_id, dare=dare)
263 case _:
264 # for all other statuses, we just need to re-index the dare since it may have been updated.
265 # this is not that costly, since dares are updated infrequently...
266 return self._reindex_dare(job_id=job_id, dare=dare)
268 def _moderate_dare(
269 self,
270 job_id: str,
271 *,
272 dare: DareWrapper,
273 ) -> JobResult[DareWrapper]:
274 """
275 Initial safety check on a new dare submission.
276 wrapper is injected by trigger_decorator after deserializing job.model_data.
277 """
278 dare_id = dare.doc_id
279 main_result = AppResult[DareWrapper](doc_id=dare_id)
281 try:
282 outcome = self.service_manager.moderation.review_dare(dare)
283 if outcome is None or outcome.is_approved:
284 if IS_DEBUG:
285 msg = f"Dare {dare_id} passed pre-flight moderation with outcome: {outcome}"
286 LOG().debug(msg)
287 return self._process_created_dare(job_id, dare)
289 cause = f"Dare {dare_id} not approved in pre-flight moderation {outcome}"
290 # NOTE: picked up by a scheduled task to notify the user.
291 self._add_job(dare_id, AppJobType.TR_DARE, dare)
293 main_result.add_error(AppErrorCode.MODERATION, cause)
294 return JobResult.from_result(main_result, doc_id=dare_id, data=dare.to_dict())
296 except Exception as e:
297 cause = f"Error processing new dare {dare_id}: {e}"
298 LOG().error(cause)
299 main_result.add_error(AppErrorCode.MODERATION, cause)
300 return JobResult.from_result(main_result, doc_id=dare_id, data=dare.to_dict())
302 def _process_created_dare(
303 self,
304 job_id: str,
305 dare: DareWrapper,
306 ) -> JobResult[DareWrapper]:
307 debug_label = f"Dare Creation Trigger: {job_id}"
308 main_result = AppResult[DareWrapper](task_name=debug_label, doc_id=job_id)
309 dare_data = dare.to_dict()
311 try:
312 process_result = self.dare_processor.process_dare(dare=dare)
313 if process_result.is_error:
314 msg = f"{debug_label} - {process_result.message or 'Processing error'}"
315 error_code = process_result.error_code or AppErrorCode.DARE_PROCESSING
316 main_result.add_error(error_code, msg)
317 except Exception as e:
318 msg = f"{debug_label} - Processing exception: {e}"
319 main_result.add_error(AppErrorCode.DARE_PROCESSING, msg)
321 if not main_result.is_error:
322 return JobResult.ok(message=f"{debug_label} processed successfully.")
324 # handle the error
325 if IS_DEBUG:
326 LOG().debug(f"{debug_label} - Errors during processing: {main_result.errors}")
328 return JobResult.from_result(main_result, data=dare_data)
330 def _process_completed_dare(
331 self,
332 job_id: str,
333 *,
334 dare: DareWrapper,
335 ) -> JobResult[DareWrapper]:
336 # the dare is completed, we need to :
337 # 1. start the vote timer
338 # 2. send out email notifications to the owner and pledgers that voting has begun.
339 # 3. update user gui activity with notification that voting has begun.
341 # NOTE: crons handle changing the vote status and sending
342 # NOTE: vote completion emails/notifications.
344 debug_label = f"Dare Creation Trigger: {job_id}"
345 main_result = AppResult[DareWrapper](task_name=debug_label, doc_id=job_id)
346 dare_data = dare.to_dict()
348 try:
349 process_result = self.dare_processor.process_dare(dare=dare)
350 if process_result.is_error:
351 error_code = process_result.error_code or AppErrorCode.DARE_PROCESSING
352 main_result.add_error(error_code, process_result.message or "Processing error")
354 except Exception as e:
355 msg = f"{debug_label} - Processing exception: {e}"
356 main_result.add_error(AppErrorCode.DARE_PROCESSING, msg)
358 if main_result.is_error:
359 if IS_DEBUG:
360 LOG().debug(f"{debug_label} - Errors during processing: {main_result.errors}")
362 return JobResult.from_result(main_result, data=dare_data)
364 # set the dare to voting and send out a notification that voting has begun.
365 # NOTE: This is where VOTING begins.
366 context: DareContext | None = None
367 try:
368 context = self._get_dare_context(dare)
369 except Exception as e:
370 msg = f"{debug_label} - Error getting dare context: {e}"
371 LOG().error(msg)
372 main_result.add_error(AppErrorCode.INVALID_CONTEXT, msg)
373 return JobResult.from_result(main_result, data=dare_data)
375 try:
376 dare.start_vote_timer()
377 self._update_dare(dare)
378 self.email_processor.notify_user_vote_start(context=context)
379 return JobResult.ok(message=f"{debug_label} processed successfully.")
380 except Exception as err:
381 msg = f"{debug_label} - Error updating dare status to VOTING: {err}"
382 LOG().error(msg)
383 error = ErrorContext.from_exception(
384 endpoint="_process_completed_dare",
385 error_code=AppErrorCode.UPDATE_FAILED,
386 message=msg,
387 error=err,
388 )
389 raise AppError.from_context(error) from err
391 # ========================================================================
392 # Method HELPERS
393 # ========================================================================
395 def _check_dare_votes(self, dare: DareWrapper, timer: StopwatchModel) -> BallotOutcome | None:
396 dare_id = dare.doc_id
397 is_expired = timer.is_expired
399 if IS_DEBUG:
400 msg = f"Getting votes for dare {dare_id} with timer {timer}. Expired={is_expired}"
401 LOG().debug(msg)
403 outcome = self.ballot_manager.count_votes(dare)
404 if outcome is None:
405 if not is_expired:
406 if IS_DEBUG:
407 LOG().debug(f"Not enough votes cast for dare {dare_id}. Skipping.")
408 return None
410 # expired
411 if IS_DEBUG:
412 LOG().debug(f"TIMED OUT and not enough votes cast for dare {dare_id}. Rejected.")
414 return BallotOutcome(
415 result=BallotResult.REJECTED,
416 algorithm=BallotAlgorithmType.VOTE_TOTALS,
417 )
419 # if we get to here we have an outcome
421 if not outcome.result.is_finalized:
422 if IS_DEBUG:
423 LOG().debug(f"Votes cast for dare {dare_id} but no decision reached. Skipping.")
424 return None
426 if IS_DEBUG:
427 msg = (
428 f"Votes cast for dare {dare_id} with outcome {outcome.result}. "
429 f"Finalized={outcome.result.is_finalized}"
430 )
431 LOG().debug(msg)
433 return outcome
435 # ========================================================================
436 # Generic HELPERS
437 # ========================================================================
439 def _add_job(
440 self,
441 doc_id: str,
442 job_type: AppJobType,
443 model: DareWrapper,
444 ) -> None:
445 self.job_logger.create_new(doc_id=doc_id, job_type=job_type, model=model.to_dict())
446 if IS_TRACE:
447 LOG().trace(f"Job {job_type} created for dare ID: {doc_id}")
449 def _get_dare_context(self, dare: DareWrapper) -> DareContext:
450 ctx = DareContextFactory().create(dare)
451 dare_id = dare.doc_id
452 if ctx is None:
453 msg = f"Could not create dare context for dare {dare_id}."
454 LOG().error(msg)
455 raise AppError.from_context(
456 ErrorContext(
457 endpoint="_get_dare_context",
458 error_code=AppErrorCode.INVALID_CONTEXT,
459 message=ErrorMessage.INVALID_CONTEXT.formatted({"OBJECT": "Dare"}),
460 cause=msg,
461 ),
462 )
464 if not ctx.validate():
465 msg = f"Invalid dare context for dare {dare_id}."
466 raise AppError.from_context(
467 ErrorContext(
468 endpoint="_get_dare_context",
469 error_code=AppErrorCode.INVALID_CONTEXT,
470 message=ErrorMessage.INVALID_CONTEXT.formatted({"OBJECT": "Dare"}),
471 cause=msg,
472 ),
473 )
475 return ctx
477 def _update_dare(self, dare: DareWrapper) -> None:
478 try:
479 updates = dare.get_updates()
480 if not updates:
481 if IS_DEBUG:
482 LOG().debug(f"No updates to apply for dare {dare.doc_id}.")
483 return
485 self.dare_db.update(dare.doc_id, updates)
486 if IS_TRACE:
487 LOG().trace(f"Updated dare {dare.doc_id} with updates: {updates}")
489 except Exception as err:
490 msg = f"Error updating dare {dare.doc_id}: {err}"
491 LOG().error(msg)
492 error = ErrorContext.from_exception(
493 endpoint="_update_dare",
494 error_code=AppErrorCode.UPDATE_FAILED,
495 message=msg,
496 error=err,
497 )
498 raise AppError.from_context(error) from err
500 def _reindex_dare(
501 self,
502 job_id: str,
503 *,
504 dare: DareWrapper,
505 ) -> JobResult[DareWrapper]:
506 dare_id = dare.doc_id
507 main_result = AppResult[DareWrapper](doc_id=dare_id)
509 if IS_DEBUG:
510 msg = f"Reindexing dare {dare_id} with job ID {job_id}."
511 LOG().debug(msg)
513 try:
514 index_result = self.indexer.process_dare(dare, is_update=True)
515 if index_result.is_error:
516 msg = f"Error reindexing dare {dare_id}/{job_id}: {index_result.errors}"
517 LOG().error(msg)
518 main_result.merge(index_result)
520 except Exception as e:
521 msg = f"Exception reindexing dare {dare_id}/{job_id}: {e}"
522 LOG().error(msg)
523 main_result.add_error(AppErrorCode.INDEX_FAILED, msg)
525 if not main_result.is_error:
526 msg = f"Dare {dare_id}/{job_id} reindexed successfully."
527 if IS_TRACE:
528 LOG().trace(msg)
530 return JobResult.ok(message=msg)
532 # error
533 if IS_DEBUG:
534 msg = f"Errors reindexing dare {dare_id}/{job_id}: {main_result.errors}"
535 LOG().debug(msg)
537 return JobResult.from_result(
538 main_result,
539 doc_id=dare_id,
540 data=dare.to_dict(),
541 )