Coverage for functions \ flipdare \ core \ trigger_decorator.py: 86%
63 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from collections.abc import Callable
15from functools import wraps
16from typing import Any, ParamSpec
17from flipdare.constants import NO_DOC_ID
18from flipdare.result.job_result import JobResult
19from flipdare.result.output_result import OutputResult
20from flipdare.generated.shared.app_error_code import AppErrorCode
21from flipdare.generated.shared.backend.app_job_type import AppJobType
22from flipdare.result.outcome import Outcome
23from flipdare.generated.shared.firestore_collections import FirestoreCollections
25__all__ = ["trigger_decorator"]
27P = ParamSpec("P")
30def trigger_decorator(
31 job_type: AppJobType,
32 collection: FirestoreCollections,
33 wrapper_class: type[Any] | None = None,
34) -> Callable[[Callable[P, Any]], Callable[P, Any]]:
35 """
36 Decorator that transforms OutputResult to ResultValue and performs error logging.
38 Optionally deserializes job.model_data into wrapper_class before calling the
39 decorated function. If model_class is provided, the function receives the
40 deserialized instance as a keyword argument `model`. MISSING_DATA and
41 CREATE_FAILED errors are handled here and short-circuit the function call.
43 When decorators are chained, the inner decorator's ResultValue is passed through:
44 return cast(ResultValue, self._processor.process(...))
46 Args:
47 job_type: Default job type for logging (can be overridden by OutputResult)
48 collection: Default collection for logging (can be overridden by OutputResult)
49 wrapper_class: Optional wrapper class to deserialize job.model_data into.
51 """
53 def decorator(
54 func: Callable[P, Any],
55 ) -> Callable[P, Any]:
56 """Transform OutputResult to ResultValue with appropriate error logging."""
58 @wraps(func)
59 def wrapper(*args: P.args, **kwargs: P.kwargs) -> Any: # noqa: PLR0912
60 from flipdare.services import get_app_logger
61 from flipdare.result.app_result import AppResult
62 from flipdare.wrapper.backend.app_job_wrapper import AppJobWrapper
64 result_obj: Any
65 if wrapper_class is None:
66 result_obj = func(*args, **kwargs)
67 else:
68 job: AppJobWrapper = args[1] # type: ignore[assignment]
69 doc_id = job.doc_id
70 model_data = job.model_data
71 if model_data is None:
72 main_result: AppResult[Any] = AppResult(doc_id=doc_id)
73 main_result.add_error(
74 AppErrorCode.MISSING_DATA,
75 f"No model data found in job {doc_id}.",
76 )
77 result_obj = JobResult.from_result(main_result, doc_id=doc_id)
78 else:
79 try:
80 # this handles 2 cases,
81 # 1. a wrapper was supplied - already created
82 # 2. from an AppJobModel
83 if "wrapper" not in kwargs:
84 kwargs["wrapper"] = wrapper_class.from_dict(model_data) # type: ignore[index]
85 result_obj = func(*args, **kwargs) # type: ignore[call-arg]
86 except Exception as e:
87 main_result = AppResult(doc_id=doc_id)
88 msg = f"Failed to deserialize {wrapper_class.__name__} from job {doc_id}: {e}"
89 main_result.add_error(AppErrorCode.CREATE_FAILED, msg)
90 result_obj = JobResult.from_result(
91 main_result, doc_id=doc_id, data=model_data
92 )
94 if isinstance(result_obj, Outcome):
95 # Already processed by nested decorator, just pass through
96 return result_obj
98 # Validate runtime type (function may not respect type annotations)
99 if not isinstance(result_obj, (OutputResult, JobResult)): # type: ignore
100 raise TypeError(
101 "The decorated function must return an OutputResult/OutputAppResult."
102 )
104 # Use result's job_type/collection if available, otherwise use decorator params
105 actual_job_type = result_obj.job_type or job_type
106 actual_collection = result_obj.collection or collection
107 actual_doc_id = result_obj.doc_id
109 if result_obj.is_ok:
110 # this is for OutputResult/OutputAppResult
111 return result_obj.outcome
113 if not isinstance(result_obj, JobResult):
114 # this technically should not happen because this is a trigger
115 # this is legacy code..
116 actual_cause = result_obj.message
117 get_app_logger().system_error(
118 error_code=result_obj.error_code or AppErrorCode.TRIGGER,
119 message=actual_cause,
120 doc_id=actual_doc_id,
121 job_type=actual_job_type,
122 collection=actual_collection,
123 )
125 return result_obj.outcome
127 # OutputAppResult - processing
128 if not result_obj.should_log:
129 value: Outcome = result_obj.outcome
130 return value
132 result = result_obj.app_result
134 # Extract doc_id from multiple possible sources
135 # we preference generated, then data, then result_obj
136 if result.generated and hasattr(result.generated, "doc_id"):
137 actual_doc_id = getattr(result.generated, "doc_id", NO_DOC_ID)
138 elif result_obj.data is not None and "doc_id" in result_obj.data:
139 actual_doc_id = result_obj.data.get("doc_id", NO_DOC_ID)
141 # Extract cause - prefer explicit cause, fallback to error string
142 actual_cause = result_obj.message or (
143 result.formatted
144 if result.is_error
145 else "No additional error information provided."
146 )
148 get_app_logger().from_trigger(
149 doc_id=actual_doc_id,
150 app_result=result_obj,
151 job_type=actual_job_type,
152 collection=actual_collection,
153 message=actual_cause,
154 )
156 return result_obj.outcome
158 return wrapper
160 return decorator