Coverage for functions \ flipdare \ firestore \ backend \ app_log_db.py: 86%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from collections import defaultdict 

14from datetime import datetime 

15from typing import Any 

16from google.cloud.firestore import Client as FirestoreClient 

17from flipdare.analysis.data.nested.time_series_log_data import TimeSeriesLogData 

18from flipdare.app_log import LOG 

19from flipdare.constants import IS_DEBUG 

20from flipdare.error.app_error import DatabaseError 

21from flipdare.error.app_error_protocol import AppErrorProtocol 

22from flipdare.firestore._app_db import AppDb 

23from flipdare.firestore.core.collection_stat_query import CollectionStatQuery 

24from flipdare.firestore.core.db_query import DbQuery, FieldOp, OrderByField, WhereField 

25from flipdare.generated import AppErrorCode, AppLogKeys, AppLogModel, AppPaymentErrorCode 

26from flipdare.generated.shared.backend.app_job_type import AppJobType 

27from flipdare.generated.shared.backend.system_log_type import SystemLogType 

28from flipdare.generated.shared.firestore_collections import FirestoreCollections 

29from flipdare.util.time_util import TimeUtil 

30from flipdare.wrapper.backend.app_log_wrapper import AppLogWrapper 

31 

32__all__ = ["AppLogDb"] 

33 

34_LOG = FirestoreCollections.APP_LOG.value 

35 

36_K = AppLogKeys 

37_OP = FieldOp 

38 

39 

40class AppLogDb(AppDb[AppLogWrapper, AppLogModel]): 

41 

42 def __init__(self, client: FirestoreClient) -> None: 

43 super().__init__( 

44 client=client, 

45 collection_name=FirestoreCollections.APP_LOG, 

46 model_class=AppLogModel, 

47 wrapper_class=AppLogWrapper, 

48 ) 

49 

50 def get_log_stats(self, days: int = 7) -> TimeSeriesLogData: 

51 date_ranges = TimeUtil.get_date_range( 

52 days, start=TimeUtil.get_start_of_day_utc(), reverse=True 

53 ) 

54 agg_stats = TimeSeriesLogData() 

55 error_ct = 0 

56 for log_type in SystemLogType: 

57 if log_type == SystemLogType.INFO: 

58 continue # skip info logs — not used in charts 

59 

60 for date_range in date_ranges: 

61 from_date = date_range.from_date 

62 to_date = date_range.to_date 

63 

64 err_ct, counts = self._get_log_counts(log_type, from_date, to_date) 

65 error_ct += err_ct 

66 

67 for error_code, count in counts.items(): 

68 # only get stats not equal to 0 . 

69 if count > 0: 

70 agg_stats.add(from_date, log_type, error_code.category, count) 

71 

72 if error_ct > 0: 

73 msg = f"Encountered {error_ct} errors while getting log stats" 

74 self.log_error( 

75 job_type=AppJobType.REPORT_LOG_STATS, 

76 message=msg, 

77 error_code=AppErrorCode.DATABASE, 

78 ) 

79 

80 return agg_stats 

81 

82 def _get_log_counts( 

83 self, 

84 log_type: SystemLogType, 

85 from_date: datetime, 

86 to_date: datetime, 

87 ) -> tuple[int, dict[AppErrorProtocol, float]]: 

88 

89 log_label = f"{TimeUtil.formatted_user(from_date)} - {TimeUtil.formatted_user(to_date)}" 

90 counts: dict[AppErrorProtocol, float] = defaultdict(float) 

91 error_ct = 0 

92 

93 error_codes = [*AppErrorCode, *AppPaymentErrorCode] 

94 client = self.client 

95 col_name = self.collection_name 

96 

97 for error_code in error_codes: 

98 where_fields = [ 

99 WhereField[Any](_K.LOG_TYPE, _OP.EQUAL, log_type), 

100 WhereField[Any](_K.ERROR_CODE, _OP.EQUAL, error_code), 

101 ] 

102 

103 try: 

104 db_query = CollectionStatQuery.custom( 

105 from_date=from_date, 

106 to_date=to_date, 

107 where_fields=where_fields, 

108 ) 

109 query = db_query.get_query(client, col_name) 

110 agg_value = self._get_agg_value(query=query) 

111 if agg_value is None: 

112 msg = f"No count value returned for {log_label} - {log_type}/{error_code}" 

113 LOG().warning(msg) 

114 error_ct += 1 

115 continue 

116 if agg_value.is_error: 

117 msg = f"Error getting count for {log_label} - {log_type}/{error_code}" 

118 LOG().error(msg) 

119 error_ct += 1 

120 continue 

121 

122 counts[error_code] = agg_value.count 

123 

124 except Exception as e: 

125 LOG().error( 

126 f"Error getting aggregate stats {log_label}: {log_type}/{error_code}: {e}" 

127 ) 

128 error_ct += 1 

129 continue 

130 

131 if IS_DEBUG: 

132 msg = f"Total count for {log_label} - {log_type}: {sum(counts.values())} (errors: {error_ct})" 

133 LOG().debug(msg) 

134 

135 return error_ct, counts 

136 

137 def get_recent_payment_critical_issues(self, hours: int = 24) -> list[AppLogWrapper]: 

138 """ 

139 Get issues that are waiting for admin review in the last 4 hours 

140 """ 

141 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

142 if IS_DEBUG: 

143 debug_str = ( 

144 f"Getting issues waiting for admin review within last " 

145 f"{TimeUtil.formatted_dt(hours_ago)}" 

146 ) 

147 LOG().debug(debug_str) 

148 

149 error_codes = [ 

150 AppPaymentErrorCode.CANCEL_INTENT_FAILED.value, 

151 AppPaymentErrorCode.LOOSING_MONEY.value, 

152 AppPaymentErrorCode.AMOUNT_TOO_SMALL.value, 

153 AppPaymentErrorCode.FX_ACCOUNT_ESTIMATE_ERROR.value, 

154 ] 

155 error_codes_str = list(error_codes) 

156 

157 try: 

158 where_fields = [ 

159 WhereField[_K](_K.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago), 

160 WhereField[_K](_K.ERROR_CODE, _OP.IN, error_codes_str), 

161 ] 

162 order_by = OrderByField.created_at(descending=False) 

163 query = DbQuery.and_(where_fields, order_by=order_by) 

164 

165 results = query.get_query(self.client, _LOG).stream() 

166 issues = [ 

167 issue for doc in results if (issue := self._cvt_snap_to_model(doc)) is not None 

168 ] 

169 if IS_DEBUG: 

170 LOG().debug(f"Retrieved {len(issues)} issues waiting for admin review.") 

171 

172 return issues 

173 except Exception as e: 

174 msg = f"Failed to get issues waiting for admin review: {e}" 

175 raise DatabaseError( 

176 message=msg, 

177 error_code=AppErrorCode.DATABASE, 

178 collection_name=self.collection_name, 

179 ) from e