Coverage for functions \ flipdare \ firestore \ flag_db.py: 91%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from google.cloud.firestore import Client as FirestoreClient 

15from google.cloud.firestore_v1.base_document import BaseDocumentReference, DocumentSnapshot 

16from flipdare.app_log import LOG 

17from flipdare.constants import IS_DEBUG 

18from flipdare.error.app_error import CodePathError, DatabaseError 

19from flipdare.firestore._app_db import AppDb 

20from flipdare.firestore._app_sub_db import AppSubDb 

21from flipdare.firestore.core.db_query import DbQuery, FieldOp, OrderByField, WhereField 

22from flipdare.firestore.core.sub_comment_transaction import SubCommentTransaction 

23from flipdare.generated import ( 

24 AppErrorCode, 

25 DisputedProgress, 

26 FlagKeys, 

27 FlagModel, 

28 FlagType, 

29 IssueProgress, 

30) 

31from flipdare.generated.model.issue.flag_model import FlagInternalKeys 

32from flipdare.generated.model.issue.issue_comment_model import IssueCommentModel 

33from flipdare.generated.shared.firestore_collections import FirestoreCollections 

34from flipdare.util.time_util import TimeUtil 

35from flipdare.wrapper import FlagWrapper, IssueCommentWrapper 

36 

37_FLAG: str = FirestoreCollections.FLAG.value 

38 

39__all__ = ["FlagDb"] 

40 

41_K = FlagKeys 

42_I = FlagInternalKeys 

43_OP = FieldOp 

44 

45type _KeyType = FlagKeys | FlagInternalKeys # for type checking. 

46 

47 

48class FlagDb(AppDb[FlagWrapper, FlagModel]): 

49 """Class for managing flag-related database operations.""" 

50 

51 def __init__(self, client: FirestoreClient) -> None: 

52 super().__init__( 

53 client=client, 

54 collection_name=FirestoreCollections.FLAG, 

55 model_class=FlagModel, 

56 wrapper_class=FlagWrapper, 

57 ) 

58 

59 self.comments = AppSubDb[IssueCommentWrapper, IssueCommentModel]( 

60 client=client, 

61 collection_name=FirestoreCollections.FLAG, 

62 sub_collection_name=FirestoreCollections.FLAG_COMMENTS, 

63 wrapper_class=IssueCommentWrapper, 

64 model_class=IssueCommentModel, 

65 ) 

66 self.comments_tx = SubCommentTransaction[IssueCommentModel]( 

67 self, 

68 FirestoreCollections.FLAG_COMMENTS, 

69 ) 

70 

71 def create_flag_comment( 

72 self, 

73 flag_id: str, 

74 comment: IssueCommentModel, 

75 ) -> IssueCommentWrapper: 

76 """Create a new flag comment under the specified flag.""" 

77 sub_col_name = self.comments.sub_collection_name 

78 

79 try: 

80 # Prepare the comment data with the generated ID 

81 comment_ref = self.comments_tx.create_with_increment( 

82 parent_id=flag_id, 

83 model=comment, 

84 count_field="comment_count", 

85 ) 

86 # Retrieval after commit 

87 return self._complete_transaction(flag_id, comment_ref) 

88 except Exception as e: 

89 msg = f"Failed to create comment for flag {flag_id}: {e}" 

90 raise DatabaseError( 

91 msg, 

92 error_code=AppErrorCode.DATABASE, 

93 collection_name=sub_col_name, 

94 document_id=flag_id, 

95 ) from e 

96 

97 def _complete_transaction( 

98 self, 

99 flag_id: str, 

100 comment_ref: BaseDocumentReference, 

101 ) -> IssueCommentWrapper: 

102 saved_doc = comment_ref.get() 

103 if not isinstance(saved_doc, DocumentSnapshot): 

104 msg = ( 

105 f"Error retreiving flag {flag_id} comment after transaction:" 

106 "Are you sure you didn't call Awaitable version of get?" 

107 ) 

108 raise CodePathError(message=msg) 

109 

110 saved_model = self.comments._cvt_sub_snap_to_model(saved_doc) 

111 if saved_model is None: 

112 raise DatabaseError( 

113 f"Failed to retrieve saved comment for flag {flag_id}", 

114 error_code=AppErrorCode.DATABASE, 

115 collection_name=self.comments.sub_collection_name, 

116 document_id=flag_id, 

117 ) 

118 return saved_model 

119 

120 def get_most_recent_comment_for_flag(self, flag_id: str) -> IssueCommentWrapper | None: 

121 """Get the most recent comment for a given flag.""" 

122 try: 

123 order_by = OrderByField.created_at(descending=True) 

124 docs = self.comments.get_all_sub(parent_id=flag_id, order_by=order_by, limit=1) 

125 LOG().warning(f"Retrieved {len(docs)} comments for flag {flag_id}") 

126 if not docs: 

127 return None 

128 return docs[0] 

129 except Exception as e: 

130 msg = f"Failed to get most recent comment for flag {flag_id}: {e}" 

131 raise DatabaseError( 

132 msg, 

133 error_code=AppErrorCode.DATABASE, 

134 collection_name=self.comments.sub_collection_name, 

135 document_id=flag_id, 

136 ) from e 

137 

138 def get_recent_major_unprocessed(self, hours: int | None = None) -> list[FlagWrapper]: 

139 """ 

140 Gets flags in open state for cron (e.g. most likely failed processing) 

141 """ 

142 if hours is None: 

143 hours = self.def_window_hours 

144 

145 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

146 major = FlagType.all_major_types() 

147 major_statuses = [ft.value for ft in major] 

148 

149 if IS_DEBUG: 

150 debug_str = ( 

151 f"Getting flags with type '{major_statuses}' and '{IssueProgress.OPEN}' waiting for admin review within last " 

152 f"{TimeUtil.formatted_dt(hours_ago)}" 

153 ) 

154 LOG().info(debug_str) 

155 

156 try: 

157 and_fields = [ 

158 WhereField[_K](_K.PROGRESS, _OP.EQUAL, IssueProgress.OPEN.value), 

159 WhereField[_K](_K.FLAG_TYPE, _OP.IN, major_statuses), 

160 WhereField[_K](_K.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago), 

161 ] 

162 order_by = OrderByField.created_at(descending=False) 

163 query = DbQuery.and_(and_fields, order_by=order_by) 

164 

165 flag_docs = query.get_query(self.client, _FLAG).stream() 

166 flags = [ 

167 flag for doc in flag_docs if (flag := self._cvt_snap_to_model(doc)) is not None 

168 ] 

169 if IS_DEBUG: 

170 LOG().debug(f"Retrieved {len(flags)} major unprocessed flags.") 

171 return flags 

172 except Exception as e: 

173 msg = f"Failed to get flags waiting for admin review: {e}" 

174 raise DatabaseError( 

175 msg, 

176 error_code=AppErrorCode.DATABASE, 

177 collection_name=_FLAG, 

178 document_id=None, 

179 ) from e 

180 

181 def get_recent_major_unacknowledged(self, hours: int | None = None) -> list[FlagWrapper]: 

182 """ 

183 Gets major flags that have been auto acknowledged but not yet reviewed by admin. 

184 """ 

185 if hours is None: 

186 hours = self.def_window_hours 

187 

188 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

189 major_flags = FlagType.all_major_types() 

190 major_flags_str = [ft.value for ft in major_flags] 

191 

192 debug_str = ( 

193 f"Getting flags with type ({major_flags_str}) waiting for " 

194 "admin review within last " 

195 f"{TimeUtil.formatted_dt(hours_ago)}" 

196 ) 

197 LOG().info(debug_str) 

198 

199 try: 

200 and_fields = [ 

201 WhereField[_K](_K.FLAG_TYPE, _OP.IN, major_flags_str), 

202 WhereField[_K](_K.PROGRESS, _OP.EQUAL, IssueProgress.WAITING_ADMIN.value), 

203 WhereField[_K](_K.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago), 

204 ] 

205 order_by = OrderByField.created_at(descending=False) 

206 query = DbQuery.and_(and_fields, order_by=order_by) 

207 

208 flag_docs = query.get_query(self.client, _FLAG).stream() 

209 entries = [ 

210 flag for doc in flag_docs if (flag := self._cvt_snap_to_model(doc)) is not None 

211 ] 

212 if IS_DEBUG: 

213 LOG().debug(f"Retrieved {len(entries)} major flags waiting for admin review.") 

214 

215 return entries 

216 except Exception as e: 

217 msg = f"Failed to get flags waiting for admin review: {e}" 

218 raise DatabaseError( 

219 msg, 

220 error_code=AppErrorCode.DATABASE, 

221 collection_name=_FLAG, 

222 document_id=None, 

223 ) from e 

224 

225 def get_recent_waiting_disputed_service(self, hours: int = 4) -> list[FlagWrapper]: 

226 """ 

227 Get flags that are waiting for admin review in the last 4 hours 

228 """ 

229 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

230 debug_str = ( 

231 f"Getting flags waiting for admin review within last " 

232 f"{TimeUtil.formatted_dt(hours_ago)}" 

233 ) 

234 LOG().info(debug_str) 

235 

236 try: 

237 and_ = [ 

238 WhereField[_KeyType](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago), 

239 WhereField[_KeyType]( 

240 _K.DISPUTED_PROGRESS, 

241 _OP.EQUAL, 

242 DisputedProgress.WAITING_ADMIN.value, 

243 ), 

244 ] 

245 order_by = OrderByField.created_at(descending=False) 

246 query = DbQuery.and_(and_, order_by=order_by) 

247 

248 flag_docs = query.get_query(self.client, _FLAG).stream() 

249 flags = [ 

250 flag for doc in flag_docs if (flag := self._cvt_snap_to_model(doc)) is not None 

251 ] 

252 if IS_DEBUG: 

253 LOG().debug(f"Retrieved {len(flags)} disputed flags waiting for admin review.") 

254 return flags 

255 except Exception as e: 

256 msg = f"Failed to get flags waiting for admin review: {e}" 

257 raise DatabaseError( 

258 msg, 

259 error_code=AppErrorCode.DATABASE, 

260 collection_name=_FLAG, 

261 document_id=None, 

262 ) from e