Coverage for functions \ flipdare \ request \ app_request.py: 78%

262 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12from typing import Any, NoReturn, Self, cast, override 

13 

14import flask 

15import werkzeug.datastructures as w 

16from firebase_admin import auth 

17from firebase_admin._auth_client import Client as AuthClient 

18from firebase_functions import https_fn 

19from flipdare.app_env import get_app_environment 

20from flipdare.app_log import LOG 

21from flipdare.app_types import JsonDict 

22from flipdare.constants import API_TOKEN_PREFIX, BEARER_TOKEN_PREFIX, IS_DEBUG 

23from flipdare.core.request_guard import RequestGuard 

24from flipdare.error.app_error import AppError, AuthError, CodePathError 

25from flipdare.error.error_context import ErrorContext 

26from flipdare.generated.schema.error.error_code_schema import ErrorCodeSchema 

27from flipdare.generated.schema.error.error_method_schema import ErrorMethodSchema 

28from flipdare.generated.shared.app_error_code import AppErrorCode 

29from flipdare.message.error_message import ErrorMessage 

30from flipdare.request.authentication_result import AuthenticationResult 

31from flipdare.request.request_types import AppHttpRequest, AppHttpRequestType 

32 

33type AppRequestType = AppHttpRequest | https_fn.CallableRequest[JsonDict] 

34 

35 

36class AppRequest[T: AppRequestType]: 

37 """ 

38 Factory-only request wrapper. 

39 

40 Use `AppRequest.http()` or `AppRequest.callable()` to construct instances. 

41 Direct instantiation is intentionally blocked to ensure invariants. 

42 """ 

43 

44 __slots__ = ( 

45 "_api_token_prefix", 

46 "_auth_attempted", 

47 "_auth_result", 

48 "_called_by", 

49 "_request", 

50 ) 

51 

52 _request: T 

53 _api_token_prefix: str 

54 _called_by: str | None 

55 _auth_attempted: bool 

56 _auth_result: AuthenticationResult | None 

57 

58 # constants 

59 _bearer_token_prefix: str = BEARER_TOKEN_PREFIX 

60 

61 # Private constructor — only factories call this 

62 def __init__(self, *, _internal: bool = False) -> None: 

63 if not _internal: 

64 raise RuntimeError("Use AppRequest.http() or AppRequest.callable()") 

65 

66 @classmethod 

67 def _create(cls) -> Self: 

68 return cls(_internal=True) 

69 

70 @classmethod 

71 def http( 

72 cls, 

73 req: flask.Request, 

74 req_type: AppHttpRequestType, 

75 ) -> "AppRequest[AppHttpRequest]": 

76 instance = cast("AppRequest[AppHttpRequest]", cls._create()) 

77 instance._api_token_prefix = API_TOKEN_PREFIX 

78 instance._request = AppHttpRequest(raw_request=req, http_type=req_type) 

79 instance._called_by = req_type.called_by 

80 instance._auth_attempted = False 

81 instance._auth_result = None 

82 return instance 

83 

84 @classmethod 

85 def callable( 

86 cls, 

87 req: https_fn.CallableRequest[JsonDict], 

88 ) -> "AppRequest[https_fn.CallableRequest[Any]]": 

89 instance = cast("AppRequest[https_fn.CallableRequest[Any]]", cls._create()) 

90 

91 instance._request = req 

92 instance._api_token_prefix = API_TOKEN_PREFIX 

93 instance._called_by = req.raw_request.endpoint or "unknown" 

94 instance._auth_attempted = False 

95 instance._auth_result = None 

96 return instance 

97 

98 @property 

99 def called_by(self) -> str: 

100 return self._called_by if self._called_by is not None else "unknown" 

101 

102 @property 

103 def method(self) -> str | None: # type: ignore[return] 

104 req = self._request 

105 match req: # type: ignore[exhaustive-match] 

106 case AppHttpRequest(): 

107 return req.raw_request.method 

108 case https_fn.CallableRequest(): 

109 return None # Callable requests don't have HTTP methods 

110 

111 @property 

112 def headers(self) -> w.Headers: 

113 return self.raw_request.headers 

114 

115 @property 

116 def endpoint(self) -> str: 

117 return self.raw_request.url or "unknown" 

118 

119 @property 

120 def ip_address(self) -> str: 

121 # this handles both Gen-1 and Gen-2 firebase functions. 

122 req_headers = self.headers 

123 

124 # 1. Check for Fastly (if using Firebase Hosting rewrites) 

125 fastly_ip = req_headers.get("Fastly-Client-Ip") 

126 if fastly_ip: 

127 return fastly_ip 

128 

129 # 2. Check X-Forwarded-For 

130 xff = req_headers.get("X-Forwarded-For") 

131 if xff: 

132 # Take the last one to avoid client-side spoofing in Cloud Run 

133 return xff.split(",")[-1].strip() 

134 

135 # 3. Fallback to remote_addr (Standard Flask/Functions fallback) 

136 return self.raw_request.remote_addr or "0.0.0.0" # noqa: S104 

137 

138 @property 

139 def remote_addr(self) -> str | None: 

140 # AppHttpRequest functions similar to callable request, 

141 # in that it wraps the request and labels it raw_request 

142 # so when a http request we are actually returning req.remote_addr 

143 # while for a callable request we are returning req.raw_request.remote_addr 

144 return self._request.raw_request.remote_addr 

145 

146 @property 

147 def auth_result(self) -> AuthenticationResult | None: 

148 return self._auth_result 

149 

150 @property 

151 def is_api_request(self) -> bool: # type: ignore[return] 

152 match self._request: # type: ignore[exhaustive-match] 

153 case AppHttpRequest(): 

154 return self._request.http_type.is_api_auth 

155 case https_fn.CallableRequest(): 

156 return False # Callable requests are not API requests 

157 

158 @property 

159 def is_backend_request(self) -> bool: # type: ignore[return] 

160 match self._request: # type: ignore[exhaustive-match] 

161 case AppHttpRequest(): 

162 return self._request.http_type.is_backend_auth 

163 case https_fn.CallableRequest(): 

164 return False # Callable requests are not API requests 

165 

166 @property 

167 def user_id(self) -> str | None: 

168 if not self._auth_attempted: 

169 # ensure authentication is attempted before accessing user_id 

170 self.is_authenticated() 

171 

172 if self._auth_result is not None: 

173 return self._auth_result.user_id 

174 

175 raise AppError.from_context( 

176 ErrorContext.unauthorized( 

177 self.endpoint, 

178 error_code=AppErrorCode.PERMISSION_DENIED, 

179 message="Unauthorized access to user_id", 

180 ), 

181 ) 

182 

183 @property 

184 def request_type(self) -> T: 

185 return self._request 

186 

187 @property 

188 def raw_request(self) -> flask.Request: 

189 request: T = self._request 

190 match request: 

191 case https_fn.CallableRequest(): 

192 return request.raw_request 

193 case AppHttpRequest(): 

194 return request.raw_request 

195 

196 @property 

197 def auth_client(self) -> AuthClient: 

198 from flipdare.services import get_auth_client 

199 

200 return get_auth_client() 

201 

202 def _update_auth_result(self, auth_result: AuthenticationResult | None) -> None: 

203 if auth_result is None: 

204 return 

205 

206 old_result = self._auth_result 

207 if old_result is None: 

208 self._auth_result = auth_result 

209 else: 

210 self._auth_result = old_result.merge(auth_result) 

211 

212 def is_valid_http_method(self) -> None: 

213 # NOTE: we currently only support POST ! 

214 called_by = self.called_by 

215 req = self._request 

216 if not RequestGuard.is_request(req): 

217 LOG().warning(f"Method check called on non-HTTP request for {called_by}") 

218 raise CodePathError("Method check called on non-HTTP request", True) 

219 

220 raw_request = req.raw_request 

221 request_type = req.http_type 

222 assert request_type is not None # this will pass, since a http request 

223 

224 if raw_request.method.lower() == request_type.method.lower(): 

225 return # valid method, not an error 

226 

227 msg = ErrorMessage.INVALID_METHOD.formatted( 

228 ErrorMethodSchema(method=raw_request.method, fn_name=called_by), 

229 ) 

230 LOG().warning(f"Invalid HTTP method: {raw_request.method} for {called_by}") 

231 raise AppError.from_context(ErrorContext.forbidden(raw_request.url, message=msg)) 

232 

233 def is_authenticated(self) -> AuthenticationResult | None: 

234 if IS_DEBUG: 

235 msg = ( 

236 f"Authentication check for {self.called_by}: " 

237 f"attempted={self._auth_attempted}, result={self._auth_result}" 

238 ) 

239 LOG().debug(msg) 

240 

241 if self._auth_attempted: 

242 if self._auth_result is not None: 

243 return self._auth_result 

244 return None 

245 

246 self._auth_attempted = True 

247 match self._request: 

248 case https_fn.CallableRequest(): 

249 # callable request can only be authenticated via Firebase Auth, so ignore isFirebaseAuthCheck flag 

250 self.is_callable_authenticated() 

251 case AppHttpRequest(): 

252 if self.is_api_request: 

253 self.is_custom_authenticated() 

254 else: 

255 self.is_firebase_authenticated() 

256 

257 if IS_DEBUG: 

258 msg = f"Authentication result for {self.called_by}: {self._auth_result}" 

259 LOG().debug(msg) 

260 

261 return self._auth_result 

262 

263 def is_callable_authenticated(self) -> None: 

264 if not RequestGuard.is_callable(self._request): 

265 msg = "Callable authentication check called on non-callable request" 

266 LOG().error(msg) 

267 raise CodePathError(msg, True) 

268 

269 req = self._request 

270 endpoint = req.raw_request.endpoint or "unknown" 

271 uid: str | None = None 

272 

273 if IS_DEBUG: 

274 msg = f"Authenticating callable request to {endpoint} with auth: {req.auth}" 

275 LOG().debug(msg) 

276 

277 if get_app_environment().use_uid_override: 

278 uid = get_app_environment().uid_override 

279 msg = f"UID override enabled, using test UID: {uid} for callable request to {endpoint}" 

280 LOG().info(msg) 

281 self._update_auth_result( 

282 AuthenticationResult( 

283 is_authenticated=True, 

284 is_allowed=True, 

285 user_id=uid, 

286 ), 

287 ) 

288 return 

289 

290 uid = req.auth.uid if req.auth is not None else None 

291 if uid is None: 

292 msg = f"Unauthenticated callable request: {endpoint}" 

293 LOG().warning(msg) 

294 raise AppError.from_context( 

295 ErrorContext.unauthorized( 

296 endpoint, 

297 message=ErrorMessage.UNAUTHORIZED_ERROR, 

298 ), 

299 ) 

300 

301 LOG().info(f"search called with ({uid}): {endpoint}") 

302 self._update_auth_result( 

303 AuthenticationResult( 

304 is_authenticated=True, 

305 is_allowed=True, 

306 user_id=uid, 

307 ), 

308 ) 

309 

310 def is_custom_authenticated(self) -> None: 

311 request = self._request 

312 if not RequestGuard.is_request(request): 

313 raise CodePathError("API authentication check called on non-HTTP request", True) 

314 

315 raw_request = request.raw_request 

316 try: 

317 LOG().info(f"Authenticating request {raw_request.url}") 

318 token = self._get_token(self._api_token_prefix, raw_request) 

319 

320 LOG().debug(f"Verifying Basic token: {token}") 

321 self._confirm_api_token(token, raw_request) 

322 

323 LOG().info(f"API authentication successful for request {raw_request.url}") 

324 self._update_auth_result( 

325 AuthenticationResult( 

326 is_authenticated=True, 

327 is_allowed=True, 

328 ), 

329 ) 

330 except AppError: 

331 raise # re-raise known AppErrors without modification 

332 except (ValueError, IndexError, auth.InvalidIdTokenError) as e: 

333 # Handle cases where the header is malformed or the token is invalid 

334 LOG().warning(f"Authentication failed: {e}") 

335 raise AuthError.from_context( 

336 ErrorContext.unauthorized( 

337 raw_request.url, 

338 message=ErrorMessage.INVALID_TOKEN, 

339 error_code=AppErrorCode.INVALID_AUTH_TOKEN, 

340 ), 

341 ) from e 

342 except Exception as e: 

343 LOG().warning(f"Authentication error: {e}") 

344 raise AppError.from_context( 

345 ErrorContext.unauthorized( 

346 raw_request.url, 

347 error_code=AppErrorCode.AUTH, 

348 ), 

349 ) from e 

350 

351 def is_firebase_authenticated(self) -> None: 

352 """ 

353 Check if request is authenticated via Firebase Auth. 

354 If error, returns a Response and no user ID. 

355 If ok, return no response and a user id 

356 """ 

357 req = self._request 

358 if not RequestGuard.is_request(req): 

359 raise ValueError("Token extraction called on non-HTTP request") 

360 

361 raw_request = req.raw_request 

362 

363 try: 

364 token_value = self._get_token(self._bearer_token_prefix, raw_request) 

365 

366 LOG().debug(f"Verifying Firebase Auth token: {token_value}") 

367 auth = self.auth_client 

368 decoded_token = auth.verify_id_token(token_value) # type: ignore 

369 uid = decoded_token["uid"] 

370 self._update_auth_result( 

371 AuthenticationResult( 

372 is_authenticated=True, 

373 is_allowed=True, 

374 user_id=uid, 

375 ), 

376 ) 

377 except Exception as e: 

378 LOG().error(f"Firebase Auth error: {e}") 

379 raise AppError.from_context( 

380 ErrorContext.server_error(raw_request.url, message=ErrorMessage.AUTH_ERROR), 

381 ) from e 

382 

383 def _get_token(self, token_key: str, raw_request: flask.Request) -> str: 

384 req = self._request 

385 

386 if not RequestGuard.is_request(req): 

387 msg = f"Invalid request {type(req)}, cant get token.." 

388 self._raise_auth_error(msg, raw_request, AppErrorCode.WRONG_AUTH_REQUEST_TYPE) 

389 

390 raw_request = req.raw_request 

391 LOG().info(f"Retrieving token {token_key} for request {raw_request.url}") 

392 

393 auth_header = raw_request.headers.get("Authorization", None) 

394 if auth_header is None: 

395 msg = f"Auth header missing: {auth_header}" 

396 self._raise_auth_error(msg, raw_request, AppErrorCode.MISSING_AUTH_HEADER) 

397 

398 # convert multiple spaces to single and split 

399 auth_header = " ".join(auth_header.split()) 

400 fields = auth_header.split(f"{token_key} ") 

401 if len(fields) != 2: # noqa: PLR2004 

402 fields = auth_header.split(" ") 

403 if len(fields) != 2: # noqa: PLR2004 

404 msg = f"Auth header malformed: {auth_header}" 

405 self._raise_auth_error(msg, raw_request, AppErrorCode.MALFORMED_AUTH_HEADER) 

406 

407 id_token = fields[1].strip() 

408 if id_token == "": 

409 msg = f"Auth token is empty in header: {auth_header}" 

410 self._raise_auth_error(msg, raw_request, AppErrorCode.EMPTY_AUTH_HEADER) 

411 

412 LOG().debug(f"Extracted token: {id_token} from header {auth_header}") 

413 return id_token 

414 

415 def _confirm_api_token(self, actual_token: str, raw_request: flask.Request) -> None: 

416 

417 from flipdare.app_config import get_app_config 

418 

419 expected_token = get_app_config().api_key(self.is_backend_request) 

420 if actual_token == expected_token: 

421 return 

422 

423 msg = f"Invalid API token: expected {expected_token}, got {actual_token}" 

424 self._raise_auth_error(msg, raw_request, AppErrorCode.INVALID_API_TOKEN) 

425 

426 def _raise_auth_error( 

427 self, 

428 msg: str, 

429 raw_request: flask.Request, 

430 code: AppErrorCode, 

431 ) -> NoReturn: 

432 LOG().error(msg) 

433 raise AppError.from_context( 

434 ErrorContext.malformed( 

435 raw_request.url, 

436 message=ErrorMessage.INVALID_REQUEST.formatted(ErrorCodeSchema(code=code)), 

437 ), 

438 ) 

439 

440 @override 

441 def __str__(self) -> str: 

442 return ( 

443 f"[AppRequest(called_by={self.called_by}, url={self.endpoint} method={self.method}, " 

444 f"remote_addr={self.remote_addr} (api={self.is_api_request}, backend={self.is_backend_request}))]" 

445 ) 

446 

447 @override 

448 def __repr__(self) -> str: 

449 attrs = self.__str__() 

450 return f"{self.__class__.__name__}({attrs})"