Coverage for functions \ flipdare \ analysis \ data_analysis.py: 87%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13import math 

14from dataclasses import dataclass 

15from enum import Enum 

16import numpy as np 

17from scipy import stats 

18from flipdare.app_types import AnalysisArrayType, NdFloatArrayType 

19from flipdare.constants import ( 

20 APPROX_INFINITE, 

21 MIN_ANALYSIS_COUNT, 

22 STATS_IQR_MULTIPLIER, 

23 STATS_Z_SCORE_THRESHOLD, 

24) 

25from flipdare.analysis.plotter import ScatterData 

26 

27__all__ = [ 

28 "DataAnalysis", 

29 "DataAnalysisProps", 

30 "OutlierAlgorithm", 

31 "AnalysisResult", 

32 "AnalysisEntry", 

33] 

34 

35 

36class OutlierAlgorithm(Enum): 

37 Z_SCORE = "z_score" 

38 IQR = "iqr" 

39 

40 @property 

41 def label(self) -> str: 

42 match self: 

43 case OutlierAlgorithm.Z_SCORE: 

44 return "Z-Score" 

45 case OutlierAlgorithm.IQR: 

46 return "IQR" 

47 

48 

49@dataclass 

50class DataAnalysisProps: 

51 z_score_threshold: int = STATS_Z_SCORE_THRESHOLD 

52 iqr_multiplier: float = STATS_IQR_MULTIPLIER 

53 algorithm: OutlierAlgorithm = OutlierAlgorithm.IQR 

54 

55 

56@dataclass 

57class AnalysisEntry: 

58 values: AnalysisArrayType 

59 indicies: list[int] 

60 

61 @classmethod 

62 def empty(cls) -> "AnalysisEntry": 

63 return cls(values=[], indicies=[]) 

64 

65 def __post_init__(self) -> None: 

66 if len(self.values) != len(self.indicies): 

67 msg = f"Inconsistent array lengths: {len(self.values)} values != {len(self.indicies)} indices." 

68 raise ValueError(msg) 

69 

70 def scatter_data(self, label: str) -> ScatterData: 

71 # Filter out both None values and their corresponding indices 

72 # This ensures Matplotlib doesn't receive 'None' in a scatter plot 

73 pairs = [(v, i) for v, i in zip(self.values, self.indicies, strict=True) if v is not None] 

74 

75 if not pairs: 

76 # Handle the case where all values were None 

77 return ScatterData(points=[], indices=[], label=label) 

78 

79 scatter_values, scatter_indices = zip(*pairs, strict=True) 

80 return ScatterData( 

81 points=list(scatter_values), 

82 indices=list(scatter_indices), 

83 label=label, 

84 ) 

85 

86 

87@dataclass 

88class AnalysisResult: 

89 detect_type: OutlierAlgorithm 

90 outliers: AnalysisEntry 

91 valid: AnalysisEntry 

92 

93 @property 

94 def has_outliers(self) -> bool: 

95 # Check if there are any non-None outliers 

96 return any(v is not None for v in self.outliers.values) 

97 

98 @property 

99 def has_valid(self) -> bool: 

100 return any(v is not None for v in self.valid.values) 

101 

102 @property 

103 def notes(self) -> list[str]: 

104 # Filter Nones before joining so the string doesn't say "[10.5, None, 12.0]" 

105 outlier_list = [v for v in self.outliers.values if v is not None] 

106 valid_list = [v for v in self.valid.values if v is not None] 

107 

108 outlier_str = ", ".join(map(str, outlier_list)) if outlier_list else "N/A" 

109 valid_str = ", ".join(map(str, valid_list)) if valid_list else "N/A" 

110 

111 return [ 

112 f"Outliers: [{outlier_str}]", 

113 f"Non Outliers: [{valid_str}]", 

114 ] 

115 

116 @property 

117 def outlier_scatter_data(self) -> ScatterData | None: 

118 if not self.has_outliers: 

119 return None 

120 return self.outliers.scatter_data(label=f"Outliers ({self.detect_type.label})") 

121 

122 @property 

123 def valid_scatter_data(self) -> ScatterData | None: 

124 # Only return scatter data if there is at least one non-None valid value 

125 if not self.has_valid: 

126 return None 

127 return self.valid.scatter_data(label=f"Non-Outliers ({self.detect_type.label})") 

128 

129 

130class DataAnalysis: 

131 __slots__ = ("_props", "_values") 

132 

133 def __init__( 

134 self, 

135 values: AnalysisArrayType, 

136 props: DataAnalysisProps | None = None, 

137 ) -> None: 

138 if props is None: 

139 props = DataAnalysisProps() 

140 

141 self._values = values 

142 self._props = props 

143 

144 @property 

145 def z_score_threshold(self) -> int: 

146 return self._props.z_score_threshold 

147 

148 @property 

149 def iqr_multiplier(self) -> float: 

150 return self._props.iqr_multiplier 

151 

152 @property 

153 def outlier_type(self) -> OutlierAlgorithm: 

154 return self._props.algorithm 

155 

156 @property 

157 def values(self) -> AnalysisArrayType: 

158 return self._values 

159 

160 @property 

161 def z_scores_formatted(self) -> list[float] | None: 

162 """Get a pretty-printed list of z-scores.""" 

163 zscores = self.z_scores 

164 if zscores is None: 

165 return None 

166 

167 entries = zscores.tolist() 

168 if len(entries) == 0: 

169 return None 

170 

171 scores: list[float] = [] 

172 for score in entries: 

173 # check for nan 

174 if math.isnan(score): 

175 scores.append(0.0) 

176 else: 

177 scores.append(round(float(score), 2)) 

178 return scores 

179 

180 @property 

181 def z_score_outliers(self) -> AnalysisResult | None: 

182 # 1. Filter out Nones but keep track of original indices 

183 indexed_data = [(i, v) for i, v in enumerate(self.values) if v is not None] 

184 if len(indexed_data) < MIN_ANALYSIS_COUNT: 

185 return None 

186 

187 indices = [x[0] for x in indexed_data] 

188 clean_values = [x[1] for x in indexed_data] 

189 

190 # 2. Calculate Z-scores on clean data 

191 std_dev = np.std(clean_values, ddof=1) 

192 if std_dev < APPROX_INFINITE: 

193 z_scores = np.zeros(len(clean_values)) 

194 else: 

195 z_scores = np.abs(stats.zscore(clean_values, ddof=1)) 

196 

197 threshold = self.z_score_threshold 

198 

199 # 3. Map results back using our 'indices' map 

200 outlier_values: AnalysisArrayType = [] 

201 outlier_indices: list[int] = [] 

202 valid_values: AnalysisArrayType = [] 

203 valid_indices: list[int] = [] 

204 

205 for i, score in enumerate(z_scores): 

206 orig_idx = indices[i] 

207 val = clean_values[i] 

208 if score > threshold: 

209 outlier_values.append(val) 

210 outlier_indices.append(orig_idx) 

211 else: 

212 valid_values.append(val) 

213 valid_indices.append(orig_idx) 

214 

215 return AnalysisResult( 

216 detect_type=OutlierAlgorithm.Z_SCORE, 

217 outliers=AnalysisEntry(values=outlier_values, indicies=outlier_indices), 

218 valid=AnalysisEntry(values=valid_values, indicies=valid_indices), 

219 ) 

220 

221 @property 

222 def interquartile_outliers(self) -> AnalysisResult | None: 

223 indexed_data = [(i, v) for i, v in enumerate(self.values) if v is not None] 

224 if len(indexed_data) < MIN_ANALYSIS_COUNT: 

225 return None 

226 

227 indices = [x[0] for x in indexed_data] 

228 data = np.array([x[1] for x in indexed_data]) 

229 

230 q1, q3 = np.percentile(data, [25, 75]) 

231 iqr = q3 - q1 

232 lower_bound = q1 - (self.iqr_multiplier * iqr) 

233 upper_bound = q3 + (self.iqr_multiplier * iqr) 

234 

235 # Identify indices relative to the 'data' array 

236 is_outlier = (data < lower_bound) | (data > upper_bound) 

237 

238 # Map those back to original indices 

239 outliers_indices = [indices[i] for i, outlier in enumerate(is_outlier) if outlier] 

240 outliers_values = data[is_outlier].tolist() 

241 

242 valid_indices = [indices[i] for i, outlier in enumerate(is_outlier) if not outlier] 

243 valid_values = data[~is_outlier].tolist() 

244 

245 return AnalysisResult( 

246 detect_type=OutlierAlgorithm.IQR, 

247 outliers=AnalysisEntry(values=outliers_values, indicies=outliers_indices), 

248 valid=AnalysisEntry(values=valid_values, indicies=valid_indices), 

249 ) 

250 

251 @property 

252 def z_scores(self) -> NdFloatArrayType | None: 

253 """Calculate the z-score for the values, correctly handling None/NaN.""" 

254 values = self.values 

255 if len(values) < MIN_ANALYSIS_COUNT: 

256 return None 

257 

258 # Convert to numpy array (None becomes np.nan) 

259 data = np.array(values, dtype=float) 

260 

261 # Calculate standard deviation while ignoring NaNs 

262 std_dev = np.nanstd(data, ddof=1) 

263 

264 if std_dev < APPROX_INFINITE: 

265 # Match original length, setting zeros where data was valid 

266 result = np.zeros(len(data)) 

267 result[np.isnan(data)] = np.nan 

268 return result 

269 

270 # nan_policy='omit' ignores NaNs for stats but keeps the array shape 

271 return np.abs(stats.zscore(data, ddof=1, nan_policy="omit")) 

272 

273 def analyze(self) -> AnalysisResult: 

274 """Determine if the values contain outliers based on z-score threshold.""" 

275 result: AnalysisResult | None = None 

276 

277 match self.outlier_type: 

278 case OutlierAlgorithm.IQR: 

279 result = self.interquartile_outliers 

280 case OutlierAlgorithm.Z_SCORE: 

281 result = self.z_score_outliers 

282 

283 if result is not None: 

284 return result 

285 

286 return AnalysisResult( 

287 detect_type=self.outlier_type, 

288 outliers=AnalysisEntry.empty(), 

289 valid=AnalysisEntry( 

290 values=self.values, 

291 indicies=list(range(len(self.values))), 

292 ), 

293 )