Coverage for functions \ flipdare \ wrapper \ _persisted_wrapper.py: 91%

233 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from datetime import datetime 

16from typing import Any, Self, override 

17 

18from google.cloud.firestore import SERVER_TIMESTAMP, DocumentSnapshot 

19 

20from pydantic_core import ValidationError 

21 

22from flipdare.app_log import LOG 

23from flipdare.app_types import DatabaseTimeType 

24from flipdare.constants import IS_DEBUG 

25from flipdare.error.app_error import CodePathError 

26from flipdare.error.data_load_error import DataLoadError 

27from flipdare.firestore.core.app_base_model import AppBaseModel 

28from flipdare.util.time_util import FirestoreTime, TimeUtil 

29 

30_CREATED_AT: str = "created_at" 

31_UPDATED_AT: str = "updated_at" 

32_VERSION: str = "version" 

33 

34 

35class PersistedWrapper[T: AppBaseModel]: 

36 """ 

37 Wrapper for persisted Firestore documents with change tracking. 

38 

39 All concrete wrappers (UserWrapper, DareWrapper, etc.) extend this class. 

40 """ 

41 

42 __slots__ = ("_changed_fields", "_created_at", "_doc_id", "_model", "_updated_at") 

43 

44 # NOTE: Subclasses MUST set this 

45 MODEL_CLASS: type[T] # pylint: disable=declare-non-slot 

46 

47 _model: T 

48 _doc_id: str 

49 _changed_fields: set[str] 

50 _created_at: DatabaseTimeType 

51 _updated_at: DatabaseTimeType 

52 

53 def __init__(self, model: T, _internal: bool = False, _is_new: bool = False) -> None: 

54 if not _internal: 

55 msg = f"Use {self.__class__.__name__}.from_firestore(), .from_model() or .from_dict() instead" 

56 raise RuntimeError(msg) 

57 

58 doc_id = getattr(model, "id", None) 

59 self._model = model 

60 self._changed_fields = set() 

61 

62 # Validate model has non-None doc_id if not new 

63 if _is_new: 

64 self._doc_id = None # type: ignore[assignment] 

65 else: 

66 if doc_id is None: 

67 msg = f"Cannot create PersistedWrapper: {model.__class__.__name__} has no id" 

68 raise ValueError(msg) 

69 self._doc_id = doc_id 

70 

71 # not all models may have created_at/updated_at, so we use getattr with default None 

72 self._created_at = getattr(model, _CREATED_AT, None) 

73 self._updated_at = getattr(model, _UPDATED_AT, None) 

74 

75 # ------------------------------------------------------------------------- 

76 # Factory Methods 

77 # ------------------------------------------------------------------------- 

78 

79 @classmethod 

80 def create(cls: type[Self], **kwargs: Any) -> Self: 

81 """Create a new wrapper for a document that hasn't been persisted yet.""" 

82 # Ensure id is None for new documents 

83 if "id" in kwargs: 

84 LOG().debug(f"Resetting 'id' {kwargs['id']} for new document.") 

85 

86 kwargs["id"] = None 

87 model = cls._load_data(kwargs) 

88 return cls(model, _internal=True, _is_new=True) 

89 

90 @classmethod 

91 def from_model(cls: type[Self], model: T) -> Self: 

92 """Create wrapper from an existing model instance (must have id).""" 

93 if getattr(model, "id", None) is None: 

94 raise ValueError("Model must have non-None id to create PersistedWrapper") 

95 return cls(model, _internal=True, _is_new=False) 

96 

97 @classmethod 

98 def from_dict(cls: type[Self], data: dict[str, Any]) -> Self: 

99 """Create wrapper from dict (must have id).""" 

100 if "id" not in data or data["id"] is None: 

101 raise ValueError(f"Cannot create wrapper from dict without 'id': {data}") 

102 

103 model = cls._load_data(data) 

104 return cls(model, _internal=True, _is_new=False) 

105 

106 @classmethod 

107 def from_firestore(cls: type[Self], doc_snapshot: DocumentSnapshot) -> Self: 

108 """Create wrapper from Firestore DocumentSnapshot.""" 

109 data = doc_snapshot.to_dict() 

110 if data is None: 

111 raise ValueError(f"Document {doc_snapshot.id} does not exist") 

112 

113 # Inject Firestore ID 

114 data["id"] = doc_snapshot.id 

115 

116 # Convert Firestore timestamps to datetime 

117 data = cls._convert_firestore_timestamps(data) 

118 

119 model = cls._load_data(data) 

120 return cls(model, _internal=True) 

121 

122 @classmethod 

123 def _load_data(cls: type[Self], data: dict[str, Any]) -> T: 

124 

125 from flipdare.error.message_format import ValidationErrorMsgFormat 

126 

127 """ 

128 Load model from dict with validation and error handling. 

129 Converts aliased field names from Firestore to Python field names. 

130 """ 

131 # Convert aliased fields to Python names 

132 # Firestore stores: VERSION, INT_P, INT_E, etc. 

133 # Pydantic expects: version, internal_priority, internal_enabled, etc. 

134 model_fields = cls.MODEL_CLASS.model_fields 

135 converted_data: dict[str, Any] = {} 

136 

137 for key, value in data.items(): 

138 # Check if this key is an alias that needs conversion 

139 python_name = None 

140 for field_name, field_info in model_fields.items(): 

141 if field_info.alias == key: 

142 python_name = field_name 

143 break 

144 

145 # Use the Python name if we found an alias match, otherwise use key as-is 

146 final_key = python_name or key 

147 converted_data[final_key] = value 

148 

149 try: 

150 return cls.MODEL_CLASS(**converted_data) 

151 except ValidationError as e: 

152 formatter = ValidationErrorMsgFormat( 

153 class_type=cls.MODEL_CLASS, 

154 error=e, 

155 ) 

156 LOG().error(str(formatter)) 

157 raise DataLoadError.model( 

158 class_name=formatter.class_name, 

159 missing_code=formatter.user_error_code, 

160 error=e, 

161 ) from e 

162 except Exception as e: 

163 msg = f"Unexpected error creating model from dict: {e}" 

164 LOG().error(msg) 

165 raise CodePathError(message=msg) from e 

166 

167 # ------------------------------------------------------------------------- 

168 # Triggers and Predicates 

169 # ------------------------------------------------------------------------- 

170 

171 def version_changed(self, other: PersistedWrapper[T]) -> bool: 

172 """Check if version field has changed compared to another wrapper.""" 

173 old_version = getattr(self.model, _VERSION, None) 

174 new_version = getattr(other.model, _VERSION, None) 

175 return old_version != new_version 

176 

177 def calculate_change_score(self, other: PersistedWrapper[T]) -> float: 

178 return self.model.calculate_change_score(other.model) 

179 

180 # ------------------------------------------------------------------------- 

181 # Property Access 

182 # ------------------------------------------------------------------------- 

183 

184 @property 

185 def model(self) -> T: 

186 """Access the underlying Pydantic model (read-only).""" 

187 return self._model 

188 

189 @property 

190 def doc_id(self) -> str: 

191 """Get document ID.""" 

192 return self._doc_id 

193 

194 # ------------------------------------------------------------------------- 

195 # Serialization Methods 

196 # ------------------------------------------------------------------------- 

197 

198 def to_dict(self) -> dict[str, Any]: 

199 """ 

200 Convert wrapper to dict suitable for Firestore. 

201 Excludes id, uses Firestore aliases to match database schema. 

202 Automatically adds updated_at timestamp if changes exist. 

203 """ 

204 # Get full document with Firestore aliases (matches how fields are stored) 

205 data: dict[str, Any] = self.model.model_dump(by_alias=True, exclude={"id"}) 

206 

207 if self.has_changes: 

208 # Existing documents with changes: update updated_at to SERVER_TIMESTAMP 

209 data["updated_at"] = SERVER_TIMESTAMP 

210 

211 return data 

212 

213 def to_dict_with_id(self) -> dict[str, Any]: 

214 """For search results.""" 

215 data: dict[str, Any] = self.model.model_dump(by_alias=False) 

216 data["id"] = self.doc_id 

217 return data 

218 

219 def to_json_dict(self) -> dict[str, Any]: 

220 """Convert to JSON-compatible dict with aliases (for storing as field in database).""" 

221 return self.model.to_json_dict() 

222 

223 @override 

224 def __repr__(self) -> str: 

225 model_repr = repr(self.model) 

226 changed = f", changed={list(self._changed_fields)}" if self._changed_fields else "" 

227 return f"{self._name}({model_repr}{changed})" 

228 

229 @override 

230 def __str__(self) -> str: 

231 return str(self.model) 

232 

233 # ------------------------------------------------------------------------- 

234 # Internal state access 

235 # these may/may not exist .. 

236 # ------------------------------------------------------------------------- 

237 def bump_version(self) -> None: 

238 """Bump the version of the underlying model, if it has a version field.""" 

239 old_value = getattr(self._model, _VERSION, None) 

240 if old_value is None or not isinstance(old_value, int): 

241 msg = "Underlying model does not have a version field that can be bumped" 

242 raise AttributeError(msg) 

243 

244 new_value = old_value + 1 

245 self.update_field(_VERSION, new_value) 

246 if IS_DEBUG: 

247 msg = f"Bumped version from {old_value} to {new_value} for {self._name} {self.doc_id}" 

248 LOG().debug(msg) 

249 

250 # ------------------------------------------------------------------------- 

251 # Change Tracking 

252 # ------------------------------------------------------------------------- 

253 

254 def update_field(self, field_name: str, value: Any) -> None: 

255 """ 

256 Update a field in the wrapped model. 

257 

258 Args: 

259 field_name: Python field name (not alias), e.g., 'version' not 'VERSION' 

260 value: New value for the field 

261 

262 """ 

263 if not hasattr(self.model, field_name): 

264 raise AttributeError(f"{self._name} has no field '{field_name}'") 

265 

266 field_name = str(field_name) # Ensure field_name is a string (handles StrEnum cases) 

267 

268 # Create new model with updated value using Python field names 

269 updated_data = self.model.model_dump(by_alias=False) 

270 updated_data[field_name] = value 

271 

272 new_model = self._load_data(updated_data) 

273 object.__setattr__(self, "_model", new_model) 

274 self._changed_fields.add(field_name) 

275 

276 def get_updates(self) -> dict[str, Any]: 

277 """ 

278 Get dict of changed fields for Firestore update. 

279 Uses Firestore aliases to match how fields are stored in database. 

280 Automatically adds updated_at timestamp. 

281 

282 Returns: 

283 Dict with only changed fields + updated_at, suitable for Firestore update() 

284 

285 """ 

286 if not self._changed_fields: 

287 return {} 

288 

289 # Get full dict with aliases (matches Firestore field names) 

290 full_dict_with_aliases = self.model.model_dump(by_alias=True, exclude={"id"}) 

291 

292 # Build reverse mapping: Python field name -> alias 

293 model_fields = type(self.model).model_fields 

294 field_to_alias: dict[str, str] = {} 

295 for py_field in self._changed_fields: 

296 # Find the alias for this Python field 

297 field_info = model_fields.get(py_field) 

298 if field_info and field_info.alias: 

299 field_to_alias[py_field] = field_info.alias 

300 else: 

301 field_to_alias[py_field] = py_field # No alias, use Python name 

302 

303 updates: dict[str, Any] = {} 

304 for py_field in self._changed_fields: 

305 alias = field_to_alias[py_field] 

306 if alias in full_dict_with_aliases: 

307 updates[alias] = full_dict_with_aliases[alias] 

308 

309 # Always add updated_at for updates 

310 updates[_UPDATED_AT] = SERVER_TIMESTAMP 

311 

312 return updates 

313 

314 def mark_changed(self, *field_names: str) -> None: 

315 """Mark fields as changed without updating values.""" 

316 for field_name in field_names: 

317 if not hasattr(self.model, field_name): 

318 raise AttributeError(f"{self._name} has no field '{field_name}'") 

319 self._changed_fields.add(field_name) 

320 

321 def clear_changes(self) -> None: 

322 """Clear change tracking (e.g., after successful save).""" 

323 self._changed_fields.clear() 

324 

325 @property 

326 def has_changes(self) -> bool: 

327 """Check if any fields have been modified.""" 

328 return len(self._changed_fields) > 0 

329 

330 @property 

331 def changed_fields(self) -> frozenset[str]: 

332 """Get immutable set of changed field names (Python names).""" 

333 return frozenset(self._changed_fields) 

334 

335 # ------------------------------------------------------------------------- 

336 # time helpers 

337 # ------------------------------------------------------------------------- 

338 

339 @property 

340 def created_at(self) -> datetime: 

341 # returns a timestamp suitable for Firestore 

342 tstamp = self._created_at 

343 dt: datetime | None = None 

344 if tstamp is not None: 

345 dt = FirestoreTime.from_firestore(tstamp) 

346 

347 if dt is None: 

348 # dt only none if tstamp is Sentinel. 

349 # return current time . 

350 # e.g. for use in calcs, 

351 # but wont be actually saved to db.. 

352 return TimeUtil.get_current_utc_dt() 

353 

354 return dt 

355 

356 @property 

357 def created_at_value(self) -> str: 

358 return FirestoreTime.formatted(self._created_at) 

359 

360 @property 

361 def created_at_db(self) -> DatabaseTimeType: 

362 # returns a timestamp suitable for Firestore 

363 tstamp = self._created_at 

364 if tstamp is None: 

365 return FirestoreTime.server_timestamp() 

366 

367 return FirestoreTime.to_firestore(tstamp) 

368 

369 @property 

370 def updated_at(self) -> datetime: 

371 # returns a timestamp suitable for Firestore 

372 tstamp = self._updated_at 

373 dt: datetime | None = None 

374 if tstamp is not None: 

375 dt = FirestoreTime.from_firestore(tstamp) 

376 

377 if dt is None: 

378 # dt only none if tstamp is Sentinel. 

379 # return current time . 

380 # e.g. for use in calcs, 

381 # but wont be actually saved to db.. 

382 return TimeUtil.get_current_utc_dt() 

383 

384 return dt 

385 

386 @property 

387 def updated_at_db(self) -> DatabaseTimeType: 

388 # returns a timestamp suitable for Firestore 

389 tstamp = self._updated_at 

390 if tstamp is None: 

391 return FirestoreTime.server_timestamp() 

392 

393 return FirestoreTime.to_firestore(tstamp) 

394 

395 @property 

396 def updated_at_value(self) -> str: 

397 return FirestoreTime.formatted(self._updated_at) 

398 

399 # ------------------------------------------------------------------------- 

400 # Helpers 

401 # ------------------------------------------------------------------------- 

402 

403 @property 

404 def _name(self) -> str: 

405 return self.__class__.__name__ 

406 

407 @property 

408 def debug_str(self) -> str: 

409 msg = f"{self.__class__.__name__}(id={self.doc_id},\n\tdata={self.to_json_dict()}" 

410 if self._changed_fields: 

411 msg += f"\n\tchanged_fields={list(self._changed_fields)}" 

412 return msg 

413 

414 @classmethod 

415 def _convert_firestore_timestamps(cls, data: dict[str, Any]) -> dict[str, Any]: 

416 converted = data.copy() 

417 

418 # Common timestamp fields 

419 timestamp_fields = ["created_at", "updated_at", "createdAt", "updatedAt"] 

420 

421 for field in timestamp_fields: 

422 if field in converted: 

423 value = converted[field] 

424 if value is not None: 

425 converted[field] = FirestoreTime.from_firestore(value) 

426 

427 return converted 

428 

429 @classmethod 

430 def _get_field_alias_map(cls) -> dict[str, str]: 

431 model_fields = cls.MODEL_CLASS.model_fields 

432 mapping: dict[str, Any] = {} 

433 

434 for py_name, field_info in model_fields.items(): 

435 # Use alias if defined, otherwise use field name 

436 alias = field_info.alias or py_name 

437 mapping[py_name] = alias 

438 

439 return mapping