Coverage for functions \ flipdare \ firestore \ dare_db.py: 76%
78 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
2#
3# This file is part of Flipdare's proprietary software and contains
4# confidential and copyrighted material. Unauthorised copying,
5# modification, distribution, or use of this file is strictly
6# prohibited without prior written permission from Flipdare Pty Ltd.
7#
8# This software includes third-party components licensed under MIT,
9# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
10#
12from google.cloud.firestore import Client as FirestoreClient
13from flipdare.app_log import LOG
14from flipdare.constants import IS_DEBUG
15from flipdare.firestore._app_db import AppDb
16from flipdare.firestore.core.db_query import DbQuery, FieldOp, OrderByField, WhereField
17from flipdare.generated import DareKeys, DareStatus
18from flipdare.generated.model.dare_model import DareInternalKeys, DareModel
19from flipdare.generated.shared.model.restriction.moderation_decision import ModerationDecision
20from flipdare.generated.shared.firestore_collections import FirestoreCollections
21from flipdare.util.time_util import TimeUtil
22from flipdare.wrapper import DareWrapper
24_DARE: str = FirestoreCollections.DARE.value
26__all__ = ["DareDb"]
28_K = DareKeys
29_OP = FieldOp
30_I = DareInternalKeys
32type _KeyType = DareKeys | DareInternalKeys # for type checking.
35class DareDb(AppDb[DareWrapper, DareModel]):
36 """Class for managing dare-related database operations."""
38 def __init__(self, client: FirestoreClient) -> None:
39 super().__init__(
40 client=client,
41 collection_name=FirestoreCollections.DARE,
42 model_class=DareModel,
43 wrapper_class=DareWrapper,
44 )
46 def get_dares_can_vote(self, hours: int | None = None) -> list[DareWrapper]:
47 """Get all dares that can be voted on from Firestore."""
48 if hours is None:
49 hours = self.def_window_hours
51 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
53 order_by = OrderByField.created_at(descending=False)
54 and_fields = [
55 WhereField[_KeyType](_I.VOTE_STARTED_EMAIL_SENT, _OP.EQUAL, False),
56 WhereField[_KeyType](_K.STATUS, _OP.EQUAL, DareStatus.VOTING.value),
57 WhereField[_KeyType](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
58 ]
60 query = DbQuery.and_(where_fields=and_fields, order_by=order_by)
61 results = query.get_query(self.client, _DARE).get()
62 if len(results) == 0:
63 return []
65 entries = [dare for doc in results if (dare := self._cvt_snap_to_model(doc)) is not None]
66 if IS_DEBUG:
67 LOG().debug(f"Retrieved {len(entries)} dares that can be voted on.")
69 return entries
71 def get_recent_unprocessed(self, hours: int | None = None) -> list[DareWrapper]:
72 """
73 DareStatus.SUBMITTED or DareStatus.RESUBMITTED
74 """
75 if hours is None:
76 hours = self.def_window_hours
78 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
79 order_by = OrderByField.created_at(descending=False)
80 statuses = [DareStatus.SUBMITTED.value, DareStatus.RESUBMITTED.value]
81 and_fields = [
82 WhereField[_KeyType](_K.STATUS, _OP.IN, statuses),
83 WhereField[_KeyType](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
84 ]
86 query = DbQuery.and_(where_fields=and_fields, order_by=order_by)
87 results = query.get_query(self.client, _DARE).get()
88 if len(results) == 0:
89 return []
91 entries = [dare for doc in results if (dare := self._cvt_snap_to_model(doc)) is not None]
92 if IS_DEBUG:
93 LOG().debug(f"Retrieved {len(entries)} dares that are unprocessed.")
95 return entries
97 def get_recent_requires_review(self, hours: int | None = None) -> list[DareWrapper]:
98 """
99 DareStatus.FLAGGED and flagId is None
100 """
101 if hours is None:
102 hours = self.def_window_hours
104 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
105 order_by = OrderByField.created_at(descending=False)
106 and_fields = [
107 WhereField[_KeyType](_K.STATUS, _OP.EQUAL, DareStatus.FLAGGED.value),
108 WhereField[_KeyType](_K.FLAG_ID, _OP.EQUAL, None),
109 WhereField[_KeyType](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
110 ]
112 query = DbQuery.and_(where_fields=and_fields, order_by=order_by)
113 results = query.get_query(self.client, _DARE).get()
114 if len(results) == 0:
115 return []
117 entries = [dare for doc in results if (dare := self._cvt_snap_to_model(doc)) is not None]
118 if IS_DEBUG:
119 LOG().debug(f"Retrieved {len(entries)} dares waiting for admin review.")
121 return entries
123 def get_recent_auto_restricted(self, hours: int | None = None) -> list[DareWrapper]:
124 """
125 DareStatus.FLAGGED and progress == IssueProgress.OPEN
126 """
127 if hours is None:
128 hours = self.def_window_hours
130 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
131 moderation_decisions = [
132 ModerationDecision.AUTO_REJECT_REPUTATION,
133 ModerationDecision.AUTO_REJECT_SENTIMENT,
134 ]
136 order_by = OrderByField.created_at(descending=False)
137 and_fields = [
138 WhereField[_KeyType](_K.STATUS, _OP.EQUAL, DareStatus.FLAGGED.value),
139 WhereField[_KeyType](_K.MODERATION_DECISION, _OP.IN, moderation_decisions),
140 WhereField[_KeyType](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
141 ]
143 query = DbQuery.and_(where_fields=and_fields, order_by=order_by)
144 results = query.get_query(self.client, _DARE).get()
145 if len(results) == 0:
146 return []
148 entries = [dare for doc in results if (dare := self._cvt_snap_to_model(doc)) is not None]
149 if IS_DEBUG:
150 LOG().debug(f"Retrieved {len(entries)} dares that are auto-restricted.")
152 return entries