Coverage for functions \ flipdare \ firestore \ user_db.py: 69%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from dataclasses import dataclass 

15from google.cloud.firestore import Client as FirestoreClient 

16from google.cloud.firestore import Increment 

17from google.cloud.firestore_v1 import DocumentSnapshot 

18from google.cloud.firestore_v1.base_query import FieldFilter 

19from flipdare.app_log import LOG 

20from flipdare.app_types import DatabaseDict 

21from flipdare.constants import IS_DEBUG, MAX_DB_CURSOR_LIMIT, REP_DECAY_EQUILLIBRIUM 

22from flipdare.error.app_error import DatabaseError 

23from flipdare.firestore import DbQuery, DbSubQuery, FieldOp, GroupDb, WhereField 

24from flipdare.firestore._app_db import AppDb 

25from flipdare.firestore._app_sub_db import AppSubDb 

26from flipdare.generated import AppErrorCode, NotificationKeys, NotificationType, UserKeys 

27from flipdare.generated.model.notification_model import NotificationModel 

28from flipdare.generated.model.user_model import UserInternalKeys, UserModel 

29from flipdare.generated.shared.firestore_collections import FirestoreCollections 

30from flipdare.util.time_util import FirestoreTime, TimeUtil 

31from flipdare.wrapper import NotificationWrapper, UserWrapper 

32 

33_USER: str = FirestoreCollections.USER.value 

34_NOTIF: str = FirestoreCollections.NOTIFICATIONS.value 

35 

36__all__ = ["UserDb", "DecayableEntries"] 

37 

38 

39_K = UserKeys 

40_I = UserInternalKeys 

41_N = NotificationKeys 

42_OP = FieldOp 

43 

44 

45@dataclass(frozen=True) 

46class DecayableEntries: 

47 users: list[UserWrapper] | None 

48 last_doc: DocumentSnapshot | None 

49 

50 

51class UserDb(AppDb[UserWrapper, UserModel]): 

52 """Class for managing user-related database operations.""" 

53 

54 def __init__( 

55 self, 

56 client: FirestoreClient, 

57 backend_cursor_limit: int = MAX_DB_CURSOR_LIMIT, 

58 ) -> None: 

59 super().__init__( 

60 client=client, 

61 collection_name=FirestoreCollections.USER, 

62 model_class=UserModel, 

63 wrapper_class=UserWrapper, 

64 ) 

65 self.backend_cursor_limit = backend_cursor_limit 

66 # Initialize Sub-Collection Handlers 

67 self.notifications = AppSubDb[NotificationWrapper, NotificationModel]( 

68 client=client, 

69 collection_name=FirestoreCollections.USER, 

70 sub_collection_name=FirestoreCollections.NOTIFICATIONS, 

71 wrapper_class=NotificationWrapper, 

72 model_class=NotificationModel, 

73 ) 

74 

75 def get_user_by_email(self, email: str) -> UserWrapper | None: 

76 """Get user by email from Firestore.""" 

77 LOG().debug(f"Getting user for email: {email}") 

78 try: 

79 query = DbQuery.where(WhereField("email", _OP.EQUAL, email), limit=1) 

80 results = query.get_query(self.client, _USER).get() 

81 

82 if len(results) <= 0: 

83 LOG().warning(f"No users found for email: {email}") 

84 return None 

85 if len(results) > 1: 

86 LOG().warning(f"Multiple users found for email: {email}") 

87 return None 

88 

89 user = self._cvt_snap_to_model(results[0]) 

90 if user is None: 

91 LOG().warning(f"No user data found for email: {email}") 

92 return None 

93 

94 # LOG().debug(f"User found for email: {email}/{user.doc_id}: {user}\ndata={str(user)}") 

95 return user 

96 

97 except Exception as error: 

98 msg = f"Failed to find user with email: {email}: {error}" 

99 LOG().error(msg) 

100 raise DatabaseError( 

101 error_code=AppErrorCode.DATABASE_EX, 

102 collection_name=_USER, 

103 document_id=None, 

104 message=msg, 

105 ) from error 

106 

107 def groups_for_user(self, user_id: str) -> list[str]: 

108 """Get user groups from Firestore - wrapper around get_groups.""" 

109 return GroupDb(self.client).groups(user_id) or [] 

110 

111 def notification_count(self, uid: str) -> int: 

112 """notification_count is stored as a value in UserWrapper""" 

113 LOG().debug(f"Getting notification count for user: {uid}") 

114 try: 

115 user_data = self._get(uid) 

116 key: str = _K.NOTIFICATION_COUNT.value 

117 if user_data and key in user_data: 

118 return int(user_data[key]) 

119 return 0 

120 except Exception as error: 

121 LOG().error(f"Error getting notification count for user {uid}: {error}") 

122 return 0 

123 

124 def update_user_with_email(self, email: str, updates: DatabaseDict) -> UserWrapper | None: 

125 """Update user document in Firestore by email.""" 

126 LOG().debug(f"Updating user for email: {email}") 

127 try: 

128 user = self.get_user_by_email(email) 

129 if user is None: 

130 LOG().warning(f"User not found for email: {email}") 

131 return None 

132 

133 user_id = user.doc_id 

134 updated_user = self.update(user_id, updates) 

135 if updated_user is None: 

136 LOG().warning(f"Failed to update user for email: {email}") 

137 return None 

138 

139 LOG().debug(f"User updated for email: {email}/{user_id} with updates: {updates}") 

140 return updated_user 

141 except Exception as error: 

142 LOG().error(f"Error updating user {email}: {error}") 

143 raise DatabaseError( 

144 error_code=AppErrorCode.DATABASE_EX, 

145 collection_name=_USER, 

146 document_id=None, 

147 message=f"Failed to update user {email}", 

148 ) from error 

149 

150 def get_recent_unprocessed(self, hours: int | None = None) -> list[UserWrapper]: 

151 if hours is None: 

152 hours = self.def_window_hours 

153 

154 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

155 and_fields = [ 

156 WhereField[_I](_I.PROCESSED, _OP.EQUAL, False), 

157 WhereField[_I](_I.UPDATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago), 

158 ] 

159 

160 try: 

161 if IS_DEBUG: 

162 msg = ( 

163 f"Getting unprocessed users within the last " 

164 f"(processed=False, updated_at >= {hours_ago}) [{hours} hours window]" 

165 ) 

166 LOG().info(msg) 

167 

168 query = DbQuery.and_(and_fields) 

169 if IS_DEBUG: 

170 LOG().debug(f"Constructed query for unprocessed users: {and_fields}") 

171 

172 results = query.get_query(self.client, _USER).stream() 

173 entries = [ 

174 user for doc in results if (user := self._cvt_snap_to_model(doc)) is not None 

175 ] 

176 if IS_DEBUG: 

177 LOG().debug(f"Retrieved {len(entries)} users waiting for admin review.") 

178 

179 return entries 

180 except Exception as e: 

181 msg = f"Failed to get users waiting for admin review: {e}" 

182 raise DatabaseError( 

183 error_code=AppErrorCode.DATABASE_EX, 

184 collection_name=_USER, 

185 document_id=None, 

186 message=msg, 

187 ) from e 

188 

189 def get_users_to_decay( 

190 self, 

191 last_doc: DocumentSnapshot | None, 

192 ) -> DecayableEntries: 

193 """Get all users with reputation not equal to 50 (neutral)""" 

194 """ NOTE: This could be a large query, so we need to """ 

195 base_query = self.client.collection(_USER).where( 

196 filter=FieldFilter(_K.REPUTATION.value, _OP.NOT_EQUAL.value, REP_DECAY_EQUILLIBRIUM), 

197 ) 

198 

199 cursor_limit = self.backend_cursor_limit 

200 

201 if last_doc is not None: 

202 query = base_query.start_after(last_doc).limit(cursor_limit) 

203 else: 

204 query = base_query.limit(cursor_limit) 

205 

206 results = query.get() 

207 if not results or len(results) == 0: 

208 return DecayableEntries(users=None, last_doc=None) 

209 

210 entries = [user for doc in results if (user := self._cvt_snap_to_model(doc)) is not None] 

211 if IS_DEBUG: 

212 LOG().debug(f"Retrieved {len(entries)} users to decay.") 

213 

214 if len(results) < cursor_limit: 

215 return DecayableEntries(users=entries, last_doc=None) 

216 

217 return DecayableEntries(users=entries, last_doc=results[-1]) 

218 

219 # Notification sub-collection methods 

220 

221 def get_notifs(self, user_id: str) -> list[NotificationWrapper]: 

222 """Get all notifications for a user.""" 

223 LOG().debug(f"Getting notifications for user: {user_id}") 

224 return self.notifications.get_all_sub(user_id) 

225 

226 def notif_exists(self, user_id: str, obj_id: str, notif_type: NotificationType) -> bool: 

227 """Check if a notification exists for a user by obj_id and notif_type.""" 

228 LOG().debug( 

229 f"Checking notification existence for user: {user_id}, " 

230 f"obj_id: {obj_id}, type: {notif_type}", 

231 ) 

232 

233 try: 

234 query = DbSubQuery.and_( 

235 parent_doc_id=user_id, 

236 where_fields=[ 

237 WhereField[_N](_N.OBJ_ID, FieldOp.EQUAL, obj_id), 

238 WhereField[_N](_N.NOTIF_TYPE, FieldOp.EQUAL, notif_type.value), 

239 ], 

240 limit=1, 

241 ) 

242 results = query.get_query(self.client, _USER, _NOTIF).get() 

243 exists = len(results) > 0 

244 LOG().debug(f"Notification existence: {exists}") 

245 return exists 

246 except Exception as error: 

247 LOG().error(f"Error checking notification existence for user {user_id}: {error}") 

248 return False 

249 

250 def create_notif( 

251 self, 

252 user_id: str, 

253 notification: NotificationModel, 

254 ) -> NotificationWrapper: 

255 """Create a new notification for a user.""" 

256 sub_saved_data = self.notifications.create_sub(user_id, notification) 

257 # also need to update notification count in user document 

258 

259 user_updates = { 

260 _K.NOTIFICATION_COUNT.value: Increment(1), 

261 _I.UPDATED_AT.value: FirestoreTime.server_timestamp(), 

262 } 

263 saved_data = self.update(user_id, user_updates) 

264 if saved_data is None: 

265 raise DatabaseError( 

266 error_code=AppErrorCode.DATABASE_EX, 

267 collection_name=_USER, 

268 document_id=user_id, 

269 message=f"Unable to updated user notification count for user {user_id}", 

270 ) 

271 

272 return sub_saved_data 

273 

274 def update_notif( 

275 self, 

276 user_id: str, 

277 notif_id: str, 

278 updates: DatabaseDict, 

279 ) -> NotificationWrapper | None: 

280 """Update a notification document.""" 

281 return self.notifications.update_sub(user_id, notif_id, updates)