Coverage for functions \ flipdare \ job \ event_parser.py: 85%
104 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from dataclasses import dataclass
15from typing import cast
17from google.cloud.firestore_v1.base_document import DocumentSnapshot
19from flipdare.app_log import LOG
20from flipdare.app_types import DatabaseDict, EventType
22__all__ = ["EventDict", "ChangeEventDict", "EventParser"]
25@dataclass
26class EventDict:
27 data: DatabaseDict
30@dataclass
31class ChangeEventDict(EventDict):
32 before: DatabaseDict
33 after: DatabaseDict
36class EventParser[D: EventDict]:
37 def __init__(self, job_name: str, event: EventType, data_class: type[D]) -> None:
38 self._job_name = job_name
39 self._event = event
40 self._dict_data: D | None = None
41 self._errors: list[str] = []
42 self._data_class = data_class
43 self._params = getattr(event, "params", None)
45 try:
46 self._dict_data = cast("D", _RawEventParser(job_name, self._event).parse())
47 except Exception as e:
48 cause = f"Failed to parse event data for {job_name}: {e!s}"
49 LOG().error(cause)
50 self._errors.append(cause)
52 @property
53 def params(self) -> dict[str, str] | None:
54 # A dictionary containing any wildcard values from the document path (e.g., userId).
55 return self._params
57 @property
58 def event_dict(self) -> D | None:
59 if len(self._errors) > 0:
60 return None
61 if self._dict_data is None:
62 return None
64 return self._dict_data
66 @property
67 def errors(self) -> list[str]:
68 return self._errors
70 @property
71 def doc_id(self) -> str | None:
72 data_dict = self.event_dict
73 if data_dict is None:
74 return None
75 # if we get here, we know "id" is present because of validation in data_from_event
76 # databaseDict is str,Any but id is always a string, so we can safely cast here
77 return str(data_dict.data.get("id"))
80class _RawEventParser:
81 def __init__(self, job_name: str, event: EventType) -> None:
82 self._job_name = job_name
83 self._event = event
85 def parse(self) -> EventDict: # noqa: PLR0915, PLR0912
86 job_name = self._job_name
87 event_data = self._event.data
89 """Convert DocumentSnapshot to dict with error handling."""
90 if event_data is None:
91 raise ValueError(f"No data available for {job_name}")
93 data: DatabaseDict | None = None
94 before_data: DatabaseDict | None = None
95 after_data: DatabaseDict | None = None
96 doc_id: str | None = None
98 is_change: bool = False
100 if isinstance(event_data, dict):
101 data = event_data
102 doc_id = data.get("id")
103 elif isinstance(event_data, DocumentSnapshot):
104 data = event_data.to_dict()
105 doc_id = event_data.id
106 else:
107 # Assume it's a Change object with before/after
108 is_change = True
109 before_data = event_data.before.to_dict() if event_data.before else None
110 after_data = event_data.after.to_dict() if event_data.after else None
111 data = after_data or before_data
112 if event_data.after is not None:
113 doc_id = event_data.after.id
114 elif event_data.before is not None:
115 doc_id = event_data.before.id
116 if doc_id is None:
117 # fallback to event param if available
118 params = getattr(event_data, "params", None)
119 if params is not None:
120 doc_id = params.get("doc_id")
122 if data is None:
123 cause = f"Could not convert DocumentSnapshot to dict for {job_name}"
124 LOG().error(cause)
125 raise ValueError(cause)
126 if doc_id is None:
127 cause = f"Document ID is missing from data for {job_name}"
128 LOG().error(cause)
129 raise ValueError(cause)
131 if "id" not in data:
132 data["id"] = doc_id
134 if not is_change:
135 # This means we have a create or delete event, so we can use data as the main data
136 return EventDict(data=data)
138 # This means we have an update event, so we need to return both before and after data
139 if after_data is None and before_data is None:
140 cause = f"Both before and after data are missing for update event in {job_name}"
141 LOG().error(cause)
142 raise ValueError(cause)
144 if before_data is None:
145 assert after_data is not None # for mypy
146 before_data = after_data
147 before_data["id"] = doc_id
148 elif after_data is None:
149 assert before_data is not None # for mypy
150 after_data = before_data
151 after_data["id"] = doc_id
153 return ChangeEventDict(data=data, before=before_data, after=after_data)