Coverage for functions \ flipdare \ request \ app_request.py: 78%
262 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
12from typing import Any, NoReturn, Self, cast, override
14import flask
15import werkzeug.datastructures as w
16from firebase_admin import auth
17from firebase_admin._auth_client import Client as AuthClient
18from firebase_functions import https_fn
19from flipdare.app_env import get_app_environment
20from flipdare.app_log import LOG
21from flipdare.app_types import JsonDict
22from flipdare.constants import API_TOKEN_PREFIX, BEARER_TOKEN_PREFIX, IS_DEBUG
23from flipdare.core.request_guard import RequestGuard
24from flipdare.error.app_error import AppError, AuthError, CodePathError
25from flipdare.error.error_context import ErrorContext
26from flipdare.generated.schema.error.error_code_schema import ErrorCodeSchema
27from flipdare.generated.schema.error.error_method_schema import ErrorMethodSchema
28from flipdare.generated.shared.app_error_code import AppErrorCode
29from flipdare.message.error_message import ErrorMessage
30from flipdare.request.authentication_result import AuthenticationResult
31from flipdare.request.request_types import AppHttpRequest, AppHttpRequestType
33type AppRequestType = AppHttpRequest | https_fn.CallableRequest[JsonDict]
36class AppRequest[T: AppRequestType]:
37 """
38 Factory-only request wrapper.
40 Use `AppRequest.http()` or `AppRequest.callable()` to construct instances.
41 Direct instantiation is intentionally blocked to ensure invariants.
42 """
44 __slots__ = (
45 "_api_token_prefix",
46 "_auth_attempted",
47 "_auth_result",
48 "_called_by",
49 "_request",
50 )
52 _request: T
53 _api_token_prefix: str
54 _called_by: str | None
55 _auth_attempted: bool
56 _auth_result: AuthenticationResult | None
58 # constants
59 _bearer_token_prefix: str = BEARER_TOKEN_PREFIX
61 # Private constructor — only factories call this
62 def __init__(self, *, _internal: bool = False) -> None:
63 if not _internal:
64 raise RuntimeError("Use AppRequest.http() or AppRequest.callable()")
66 @classmethod
67 def _create(cls) -> Self:
68 return cls(_internal=True)
70 @classmethod
71 def http(
72 cls,
73 req: flask.Request,
74 req_type: AppHttpRequestType,
75 ) -> "AppRequest[AppHttpRequest]":
76 instance = cast("AppRequest[AppHttpRequest]", cls._create())
77 instance._api_token_prefix = API_TOKEN_PREFIX
78 instance._request = AppHttpRequest(raw_request=req, http_type=req_type)
79 instance._called_by = req_type.called_by
80 instance._auth_attempted = False
81 instance._auth_result = None
82 return instance
84 @classmethod
85 def callable(
86 cls,
87 req: https_fn.CallableRequest[JsonDict],
88 ) -> "AppRequest[https_fn.CallableRequest[Any]]":
89 instance = cast("AppRequest[https_fn.CallableRequest[Any]]", cls._create())
91 instance._request = req
92 instance._api_token_prefix = API_TOKEN_PREFIX
93 instance._called_by = req.raw_request.endpoint or "unknown"
94 instance._auth_attempted = False
95 instance._auth_result = None
96 return instance
98 @property
99 def called_by(self) -> str:
100 return self._called_by if self._called_by is not None else "unknown"
102 @property
103 def method(self) -> str | None: # type: ignore[return]
104 req = self._request
105 match req: # type: ignore[exhaustive-match]
106 case AppHttpRequest():
107 return req.raw_request.method
108 case https_fn.CallableRequest():
109 return None # Callable requests don't have HTTP methods
111 @property
112 def headers(self) -> w.Headers:
113 return self.raw_request.headers
115 @property
116 def endpoint(self) -> str:
117 return self.raw_request.url or "unknown"
119 @property
120 def ip_address(self) -> str:
121 # this handles both Gen-1 and Gen-2 firebase functions.
122 req_headers = self.headers
124 # 1. Check for Fastly (if using Firebase Hosting rewrites)
125 fastly_ip = req_headers.get("Fastly-Client-Ip")
126 if fastly_ip:
127 return fastly_ip
129 # 2. Check X-Forwarded-For
130 xff = req_headers.get("X-Forwarded-For")
131 if xff:
132 # Take the last one to avoid client-side spoofing in Cloud Run
133 return xff.split(",")[-1].strip()
135 # 3. Fallback to remote_addr (Standard Flask/Functions fallback)
136 return self.raw_request.remote_addr or "0.0.0.0" # noqa: S104
138 @property
139 def remote_addr(self) -> str | None:
140 # AppHttpRequest functions similar to callable request,
141 # in that it wraps the request and labels it raw_request
142 # so when a http request we are actually returning req.remote_addr
143 # while for a callable request we are returning req.raw_request.remote_addr
144 return self._request.raw_request.remote_addr
146 @property
147 def auth_result(self) -> AuthenticationResult | None:
148 return self._auth_result
150 @property
151 def is_api_request(self) -> bool: # type: ignore[return]
152 match self._request: # type: ignore[exhaustive-match]
153 case AppHttpRequest():
154 return self._request.http_type.is_api_auth
155 case https_fn.CallableRequest():
156 return False # Callable requests are not API requests
158 @property
159 def is_backend_request(self) -> bool: # type: ignore[return]
160 match self._request: # type: ignore[exhaustive-match]
161 case AppHttpRequest():
162 return self._request.http_type.is_backend_auth
163 case https_fn.CallableRequest():
164 return False # Callable requests are not API requests
166 @property
167 def user_id(self) -> str | None:
168 if not self._auth_attempted:
169 # ensure authentication is attempted before accessing user_id
170 self.is_authenticated()
172 if self._auth_result is not None:
173 return self._auth_result.user_id
175 raise AppError.from_context(
176 ErrorContext.unauthorized(
177 self.endpoint,
178 error_code=AppErrorCode.PERMISSION_DENIED,
179 message="Unauthorized access to user_id",
180 ),
181 )
183 @property
184 def request_type(self) -> T:
185 return self._request
187 @property
188 def raw_request(self) -> flask.Request:
189 request: T = self._request
190 match request:
191 case https_fn.CallableRequest():
192 return request.raw_request
193 case AppHttpRequest():
194 return request.raw_request
196 @property
197 def auth_client(self) -> AuthClient:
198 from flipdare.services import get_auth_client
200 return get_auth_client()
202 def _update_auth_result(self, auth_result: AuthenticationResult | None) -> None:
203 if auth_result is None:
204 return
206 old_result = self._auth_result
207 if old_result is None:
208 self._auth_result = auth_result
209 else:
210 self._auth_result = old_result.merge(auth_result)
212 def is_valid_http_method(self) -> None:
213 # NOTE: we currently only support POST !
214 called_by = self.called_by
215 req = self._request
216 if not RequestGuard.is_request(req):
217 LOG().warning(f"Method check called on non-HTTP request for {called_by}")
218 raise CodePathError("Method check called on non-HTTP request", True)
220 raw_request = req.raw_request
221 request_type = req.http_type
222 assert request_type is not None # this will pass, since a http request
224 if raw_request.method.lower() == request_type.method.lower():
225 return # valid method, not an error
227 msg = ErrorMessage.INVALID_METHOD.formatted(
228 ErrorMethodSchema(method=raw_request.method, fn_name=called_by),
229 )
230 LOG().warning(f"Invalid HTTP method: {raw_request.method} for {called_by}")
231 raise AppError.from_context(ErrorContext.forbidden(raw_request.url, message=msg))
233 def is_authenticated(self) -> AuthenticationResult | None:
234 if IS_DEBUG:
235 msg = (
236 f"Authentication check for {self.called_by}: "
237 f"attempted={self._auth_attempted}, result={self._auth_result}"
238 )
239 LOG().debug(msg)
241 if self._auth_attempted:
242 if self._auth_result is not None:
243 return self._auth_result
244 return None
246 self._auth_attempted = True
247 match self._request:
248 case https_fn.CallableRequest():
249 # callable request can only be authenticated via Firebase Auth, so ignore isFirebaseAuthCheck flag
250 self.is_callable_authenticated()
251 case AppHttpRequest():
252 if self.is_api_request:
253 self.is_custom_authenticated()
254 else:
255 self.is_firebase_authenticated()
257 if IS_DEBUG:
258 msg = f"Authentication result for {self.called_by}: {self._auth_result}"
259 LOG().debug(msg)
261 return self._auth_result
263 def is_callable_authenticated(self) -> None:
264 if not RequestGuard.is_callable(self._request):
265 msg = "Callable authentication check called on non-callable request"
266 LOG().error(msg)
267 raise CodePathError(msg, True)
269 req = self._request
270 endpoint = req.raw_request.endpoint or "unknown"
271 uid: str | None = None
273 if IS_DEBUG:
274 msg = f"Authenticating callable request to {endpoint} with auth: {req.auth}"
275 LOG().debug(msg)
277 if get_app_environment().use_uid_override:
278 uid = get_app_environment().uid_override
279 msg = f"UID override enabled, using test UID: {uid} for callable request to {endpoint}"
280 LOG().info(msg)
281 self._update_auth_result(
282 AuthenticationResult(
283 is_authenticated=True,
284 is_allowed=True,
285 user_id=uid,
286 ),
287 )
288 return
290 uid = req.auth.uid if req.auth is not None else None
291 if uid is None:
292 msg = f"Unauthenticated callable request: {endpoint}"
293 LOG().warning(msg)
294 raise AppError.from_context(
295 ErrorContext.unauthorized(
296 endpoint,
297 message=ErrorMessage.UNAUTHORIZED_ERROR,
298 ),
299 )
301 LOG().info(f"search called with ({uid}): {endpoint}")
302 self._update_auth_result(
303 AuthenticationResult(
304 is_authenticated=True,
305 is_allowed=True,
306 user_id=uid,
307 ),
308 )
310 def is_custom_authenticated(self) -> None:
311 request = self._request
312 if not RequestGuard.is_request(request):
313 raise CodePathError("API authentication check called on non-HTTP request", True)
315 raw_request = request.raw_request
316 try:
317 LOG().info(f"Authenticating request {raw_request.url}")
318 token = self._get_token(self._api_token_prefix, raw_request)
320 LOG().debug(f"Verifying Basic token: {token}")
321 self._confirm_api_token(token, raw_request)
323 LOG().info(f"API authentication successful for request {raw_request.url}")
324 self._update_auth_result(
325 AuthenticationResult(
326 is_authenticated=True,
327 is_allowed=True,
328 ),
329 )
330 except AppError:
331 raise # re-raise known AppErrors without modification
332 except (ValueError, IndexError, auth.InvalidIdTokenError) as e:
333 # Handle cases where the header is malformed or the token is invalid
334 LOG().warning(f"Authentication failed: {e}")
335 raise AuthError.from_context(
336 ErrorContext.unauthorized(
337 raw_request.url,
338 message=ErrorMessage.INVALID_TOKEN,
339 error_code=AppErrorCode.INVALID_AUTH_TOKEN,
340 ),
341 ) from e
342 except Exception as e:
343 LOG().warning(f"Authentication error: {e}")
344 raise AppError.from_context(
345 ErrorContext.unauthorized(
346 raw_request.url,
347 error_code=AppErrorCode.AUTH,
348 ),
349 ) from e
351 def is_firebase_authenticated(self) -> None:
352 """
353 Check if request is authenticated via Firebase Auth.
354 If error, returns a Response and no user ID.
355 If ok, return no response and a user id
356 """
357 req = self._request
358 if not RequestGuard.is_request(req):
359 raise ValueError("Token extraction called on non-HTTP request")
361 raw_request = req.raw_request
363 try:
364 token_value = self._get_token(self._bearer_token_prefix, raw_request)
366 LOG().debug(f"Verifying Firebase Auth token: {token_value}")
367 auth = self.auth_client
368 decoded_token = auth.verify_id_token(token_value) # type: ignore
369 uid = decoded_token["uid"]
370 self._update_auth_result(
371 AuthenticationResult(
372 is_authenticated=True,
373 is_allowed=True,
374 user_id=uid,
375 ),
376 )
377 except Exception as e:
378 LOG().error(f"Firebase Auth error: {e}")
379 raise AppError.from_context(
380 ErrorContext.server_error(raw_request.url, message=ErrorMessage.AUTH_ERROR),
381 ) from e
383 def _get_token(self, token_key: str, raw_request: flask.Request) -> str:
384 req = self._request
386 if not RequestGuard.is_request(req):
387 msg = f"Invalid request {type(req)}, cant get token.."
388 self._raise_auth_error(msg, raw_request, AppErrorCode.WRONG_AUTH_REQUEST_TYPE)
390 raw_request = req.raw_request
391 LOG().info(f"Retrieving token {token_key} for request {raw_request.url}")
393 auth_header = raw_request.headers.get("Authorization", None)
394 if auth_header is None:
395 msg = f"Auth header missing: {auth_header}"
396 self._raise_auth_error(msg, raw_request, AppErrorCode.MISSING_AUTH_HEADER)
398 # convert multiple spaces to single and split
399 auth_header = " ".join(auth_header.split())
400 fields = auth_header.split(f"{token_key} ")
401 if len(fields) != 2: # noqa: PLR2004
402 fields = auth_header.split(" ")
403 if len(fields) != 2: # noqa: PLR2004
404 msg = f"Auth header malformed: {auth_header}"
405 self._raise_auth_error(msg, raw_request, AppErrorCode.MALFORMED_AUTH_HEADER)
407 id_token = fields[1].strip()
408 if id_token == "":
409 msg = f"Auth token is empty in header: {auth_header}"
410 self._raise_auth_error(msg, raw_request, AppErrorCode.EMPTY_AUTH_HEADER)
412 LOG().debug(f"Extracted token: {id_token} from header {auth_header}")
413 return id_token
415 def _confirm_api_token(self, actual_token: str, raw_request: flask.Request) -> None:
417 from flipdare.app_config import get_app_config
419 expected_token = get_app_config().api_key(self.is_backend_request)
420 if actual_token == expected_token:
421 return
423 msg = f"Invalid API token: expected {expected_token}, got {actual_token}"
424 self._raise_auth_error(msg, raw_request, AppErrorCode.INVALID_API_TOKEN)
426 def _raise_auth_error(
427 self,
428 msg: str,
429 raw_request: flask.Request,
430 code: AppErrorCode,
431 ) -> NoReturn:
432 LOG().error(msg)
433 raise AppError.from_context(
434 ErrorContext.malformed(
435 raw_request.url,
436 message=ErrorMessage.INVALID_REQUEST.formatted(ErrorCodeSchema(code=code)),
437 ),
438 )
440 @override
441 def __str__(self) -> str:
442 return (
443 f"[AppRequest(called_by={self.called_by}, url={self.endpoint} method={self.method}, "
444 f"remote_addr={self.remote_addr} (api={self.is_api_request}, backend={self.is_backend_request}))]"
445 )
447 @override
448 def __repr__(self) -> str:
449 attrs = self.__str__()
450 return f"{self.__class__.__name__}({attrs})"