Coverage for functions \ flipdare \ firestore \ user_db.py: 69%
143 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from dataclasses import dataclass
15from google.cloud.firestore import Client as FirestoreClient
16from google.cloud.firestore import Increment
17from google.cloud.firestore_v1 import DocumentSnapshot
18from google.cloud.firestore_v1.base_query import FieldFilter
19from flipdare.app_log import LOG
20from flipdare.app_types import DatabaseDict
21from flipdare.constants import IS_DEBUG, MAX_DB_CURSOR_LIMIT, REP_DECAY_EQUILLIBRIUM
22from flipdare.error.app_error import DatabaseError
23from flipdare.firestore import DbQuery, DbSubQuery, FieldOp, GroupDb, WhereField
24from flipdare.firestore._app_db import AppDb
25from flipdare.firestore._app_sub_db import AppSubDb
26from flipdare.generated import AppErrorCode, NotificationKeys, NotificationType, UserKeys
27from flipdare.generated.model.notification_model import NotificationModel
28from flipdare.generated.model.user_model import UserInternalKeys, UserModel
29from flipdare.generated.shared.firestore_collections import FirestoreCollections
30from flipdare.util.time_util import FirestoreTime, TimeUtil
31from flipdare.wrapper import NotificationWrapper, UserWrapper
33_USER: str = FirestoreCollections.USER.value
34_NOTIF: str = FirestoreCollections.NOTIFICATIONS.value
36__all__ = ["UserDb", "DecayableEntries"]
39_K = UserKeys
40_I = UserInternalKeys
41_N = NotificationKeys
42_OP = FieldOp
45@dataclass(frozen=True)
46class DecayableEntries:
47 users: list[UserWrapper] | None
48 last_doc: DocumentSnapshot | None
51class UserDb(AppDb[UserWrapper, UserModel]):
52 """Class for managing user-related database operations."""
54 def __init__(
55 self,
56 client: FirestoreClient,
57 backend_cursor_limit: int = MAX_DB_CURSOR_LIMIT,
58 ) -> None:
59 super().__init__(
60 client=client,
61 collection_name=FirestoreCollections.USER,
62 model_class=UserModel,
63 wrapper_class=UserWrapper,
64 )
65 self.backend_cursor_limit = backend_cursor_limit
66 # Initialize Sub-Collection Handlers
67 self.notifications = AppSubDb[NotificationWrapper, NotificationModel](
68 client=client,
69 collection_name=FirestoreCollections.USER,
70 sub_collection_name=FirestoreCollections.NOTIFICATIONS,
71 wrapper_class=NotificationWrapper,
72 model_class=NotificationModel,
73 )
75 def get_user_by_email(self, email: str) -> UserWrapper | None:
76 """Get user by email from Firestore."""
77 LOG().debug(f"Getting user for email: {email}")
78 try:
79 query = DbQuery.where(WhereField("email", _OP.EQUAL, email), limit=1)
80 results = query.get_query(self.client, _USER).get()
82 if len(results) <= 0:
83 LOG().warning(f"No users found for email: {email}")
84 return None
85 if len(results) > 1:
86 LOG().warning(f"Multiple users found for email: {email}")
87 return None
89 user = self._cvt_snap_to_model(results[0])
90 if user is None:
91 LOG().warning(f"No user data found for email: {email}")
92 return None
94 # LOG().debug(f"User found for email: {email}/{user.doc_id}: {user}\ndata={str(user)}")
95 return user
97 except Exception as error:
98 msg = f"Failed to find user with email: {email}: {error}"
99 LOG().error(msg)
100 raise DatabaseError(
101 error_code=AppErrorCode.DATABASE_EX,
102 collection_name=_USER,
103 document_id=None,
104 message=msg,
105 ) from error
107 def groups_for_user(self, user_id: str) -> list[str]:
108 """Get user groups from Firestore - wrapper around get_groups."""
109 return GroupDb(self.client).groups(user_id) or []
111 def notification_count(self, uid: str) -> int:
112 """notification_count is stored as a value in UserWrapper"""
113 LOG().debug(f"Getting notification count for user: {uid}")
114 try:
115 user_data = self._get(uid)
116 key: str = _K.NOTIFICATION_COUNT.value
117 if user_data and key in user_data:
118 return int(user_data[key])
119 return 0
120 except Exception as error:
121 LOG().error(f"Error getting notification count for user {uid}: {error}")
122 return 0
124 def update_user_with_email(self, email: str, updates: DatabaseDict) -> UserWrapper | None:
125 """Update user document in Firestore by email."""
126 LOG().debug(f"Updating user for email: {email}")
127 try:
128 user = self.get_user_by_email(email)
129 if user is None:
130 LOG().warning(f"User not found for email: {email}")
131 return None
133 user_id = user.doc_id
134 updated_user = self.update(user_id, updates)
135 if updated_user is None:
136 LOG().warning(f"Failed to update user for email: {email}")
137 return None
139 LOG().debug(f"User updated for email: {email}/{user_id} with updates: {updates}")
140 return updated_user
141 except Exception as error:
142 LOG().error(f"Error updating user {email}: {error}")
143 raise DatabaseError(
144 error_code=AppErrorCode.DATABASE_EX,
145 collection_name=_USER,
146 document_id=None,
147 message=f"Failed to update user {email}",
148 ) from error
150 def get_recent_unprocessed(self, hours: int | None = None) -> list[UserWrapper]:
151 if hours is None:
152 hours = self.def_window_hours
154 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
155 and_fields = [
156 WhereField[_I](_I.PROCESSED, _OP.EQUAL, False),
157 WhereField[_I](_I.UPDATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
158 ]
160 try:
161 if IS_DEBUG:
162 msg = (
163 f"Getting unprocessed users within the last "
164 f"(processed=False, updated_at >= {hours_ago}) [{hours} hours window]"
165 )
166 LOG().info(msg)
168 query = DbQuery.and_(and_fields)
169 if IS_DEBUG:
170 LOG().debug(f"Constructed query for unprocessed users: {and_fields}")
172 results = query.get_query(self.client, _USER).stream()
173 entries = [
174 user for doc in results if (user := self._cvt_snap_to_model(doc)) is not None
175 ]
176 if IS_DEBUG:
177 LOG().debug(f"Retrieved {len(entries)} users waiting for admin review.")
179 return entries
180 except Exception as e:
181 msg = f"Failed to get users waiting for admin review: {e}"
182 raise DatabaseError(
183 error_code=AppErrorCode.DATABASE_EX,
184 collection_name=_USER,
185 document_id=None,
186 message=msg,
187 ) from e
189 def get_users_to_decay(
190 self,
191 last_doc: DocumentSnapshot | None,
192 ) -> DecayableEntries:
193 """Get all users with reputation not equal to 50 (neutral)"""
194 """ NOTE: This could be a large query, so we need to """
195 base_query = self.client.collection(_USER).where(
196 filter=FieldFilter(_K.REPUTATION.value, _OP.NOT_EQUAL.value, REP_DECAY_EQUILLIBRIUM),
197 )
199 cursor_limit = self.backend_cursor_limit
201 if last_doc is not None:
202 query = base_query.start_after(last_doc).limit(cursor_limit)
203 else:
204 query = base_query.limit(cursor_limit)
206 results = query.get()
207 if not results or len(results) == 0:
208 return DecayableEntries(users=None, last_doc=None)
210 entries = [user for doc in results if (user := self._cvt_snap_to_model(doc)) is not None]
211 if IS_DEBUG:
212 LOG().debug(f"Retrieved {len(entries)} users to decay.")
214 if len(results) < cursor_limit:
215 return DecayableEntries(users=entries, last_doc=None)
217 return DecayableEntries(users=entries, last_doc=results[-1])
219 # Notification sub-collection methods
221 def get_notifs(self, user_id: str) -> list[NotificationWrapper]:
222 """Get all notifications for a user."""
223 LOG().debug(f"Getting notifications for user: {user_id}")
224 return self.notifications.get_all_sub(user_id)
226 def notif_exists(self, user_id: str, obj_id: str, notif_type: NotificationType) -> bool:
227 """Check if a notification exists for a user by obj_id and notif_type."""
228 LOG().debug(
229 f"Checking notification existence for user: {user_id}, "
230 f"obj_id: {obj_id}, type: {notif_type}",
231 )
233 try:
234 query = DbSubQuery.and_(
235 parent_doc_id=user_id,
236 where_fields=[
237 WhereField[_N](_N.OBJ_ID, FieldOp.EQUAL, obj_id),
238 WhereField[_N](_N.NOTIF_TYPE, FieldOp.EQUAL, notif_type.value),
239 ],
240 limit=1,
241 )
242 results = query.get_query(self.client, _USER, _NOTIF).get()
243 exists = len(results) > 0
244 LOG().debug(f"Notification existence: {exists}")
245 return exists
246 except Exception as error:
247 LOG().error(f"Error checking notification existence for user {user_id}: {error}")
248 return False
250 def create_notif(
251 self,
252 user_id: str,
253 notification: NotificationModel,
254 ) -> NotificationWrapper:
255 """Create a new notification for a user."""
256 sub_saved_data = self.notifications.create_sub(user_id, notification)
257 # also need to update notification count in user document
259 user_updates = {
260 _K.NOTIFICATION_COUNT.value: Increment(1),
261 _I.UPDATED_AT.value: FirestoreTime.server_timestamp(),
262 }
263 saved_data = self.update(user_id, user_updates)
264 if saved_data is None:
265 raise DatabaseError(
266 error_code=AppErrorCode.DATABASE_EX,
267 collection_name=_USER,
268 document_id=user_id,
269 message=f"Unable to updated user notification count for user {user_id}",
270 )
272 return sub_saved_data
274 def update_notif(
275 self,
276 user_id: str,
277 notif_id: str,
278 updates: DatabaseDict,
279 ) -> NotificationWrapper | None:
280 """Update a notification document."""
281 return self.notifications.update_sub(user_id, notif_id, updates)