Coverage for functions \ flipdare \ backend \ exchange_rate_monitor.py: 76%
177 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15import json
16import requests
17from requests.exceptions import Timeout
19from flipdare.app_log import LOG
20from flipdare.app_types import ExchangeRateDict, JsonDict
21from flipdare.constants import (
22 CURRENCY_API_TIMEOUT,
23 EXCHANGE_RATE_BASE_CURRENCY,
24 EXCHANGE_RATE_HEADERS,
25 EXCHANGE_RATE_URL,
26 IS_DEBUG,
27 IS_TRACE,
28)
29from flipdare.core import Singleton
30from flipdare.generated.shared.app_log_category import AppLogCategory
31from flipdare.result.app_result import AppResult
32from flipdare.result.job_result import JobResult
33from flipdare.result.output_result import OutputResult
34from flipdare.mailer.admin_mailer import AdminMailer
35from flipdare.firestore.backend.exchange_rate_db import ExchangeRateDb
36from flipdare.generated import StripeCurrencyCode
37from flipdare.generated.model.internal.exchange_rate_model import ExchangeRateModel
38from flipdare.generated.shared.app_error_code import AppErrorCode
39from flipdare.generated.shared.app_http_code import AppHttpCode
40from flipdare.generated.shared.backend.app_job_type import AppJobType
41from flipdare.util.time_util import TimeUtil
42from flipdare.wrapper.internal.exchange_rate_wrapper import ExchangeRateWrapper
44__all__ = ["ExchangeRateMonitor"]
46#
47# Example response from Open Exchange Rates API
48# {
49# disclaimer: "https://openexchangerates.org/terms/",
50# license: "https://openexchangerates.org/license/",
51# timestamp: 1449877801,
52# base: "USD",
53# rates: {
54# AED: 3.672538,
55# AFN: 66.809999,
56# ALL: 125.716501,
57# AMD: 484.902502,
58# ANG: 1.788575,
59# AOA: 135.295998,
60# ARS: 9.750101,
61# AUD: 1.390866,
62# /* ... */
63# }
64# }
65#
68class ExchangeRateMonitor(Singleton):
70 __slots__ = (
71 "_api_headers",
72 "_api_key",
73 "_base_currency",
74 "_currency_db",
75 "_exchange_rate_url",
76 )
78 def __init__(
79 self,
80 currency_db: ExchangeRateDb | None = None,
81 admin_mailer: AdminMailer | None = None,
82 api_key: str | None = None,
83 api_headers: dict[str, str] = EXCHANGE_RATE_HEADERS,
84 base_currency: StripeCurrencyCode = EXCHANGE_RATE_BASE_CURRENCY,
85 ) -> None:
86 from flipdare.app_config import get_app_config
88 if api_key is None:
89 api_key = get_app_config().api_key(is_backend=False)
91 self._currency_db = currency_db
92 self._admin_mailer = admin_mailer
93 self._api_key = api_key
94 self._api_headers = api_headers
95 self._exchange_rate_url = EXCHANGE_RATE_URL.replace("__API_KEY__", api_key)
96 self._base_currency = base_currency
98 @property
99 def currency_db(self) -> ExchangeRateDb:
100 if self._currency_db is None:
102 from flipdare.services import get_exchange_rate_db
104 self._currency_db = get_exchange_rate_db()
105 return self._currency_db
107 @property
108 def admin_mailer(self) -> AdminMailer:
109 if self._admin_mailer is None:
110 from flipdare.services import get_admin_mailer
112 self._admin_mailer = get_admin_mailer()
113 return self._admin_mailer
115 @property
116 def api_key(self) -> str:
117 return self._api_key
119 @property
120 def exchange_rate_url(self) -> str:
121 return self._exchange_rate_url
123 @property
124 def api_headers(self) -> dict[str, str]:
125 return self._api_headers
127 def convert_cents_to_usd_cents(self, units: int, currency: StripeCurrencyCode) -> int | None:
128 """
129 Convert given currency cents to USD cents using exchange rates stored in the database.
130 If currency is already USD, returns the same value.
131 If no exchange rate is found, returns None.
132 For zero decimal currencies, units are treated as whole units.
133 """
134 if currency == StripeCurrencyCode.USD:
135 # already in cents USD, just return cents
136 return units
138 db = self.currency_db
139 rate_id = db.get_currency_id(self._base_currency, currency)
140 if rate_id is None:
141 LOG().error(f"No exchange rate found for currency: {currency}")
142 return None
144 if IS_TRACE:
145 LOG().trace(f"Found exchange rate ID {rate_id} for currency: {currency}")
147 rate_model = db.get(rate_id)
148 if rate_model is None:
149 LOG().error(f"No exchange rate found for currency: {currency}")
150 return None
152 if IS_TRACE:
153 msg = f"Exchange rate model retrieved: {rate_model} with rate {rate_model.rate}"
154 LOG().trace(msg)
156 exchange_rate = rate_model.rate
157 if currency.is_zero_decimal:
158 # For zero decimal currencies, input units are whole currency units (not cents)
159 # Example: 1111 JPY units = 1111 JPY
160 # Rate is "1 JPY = X USD", so multiply to get USD dollars
161 # Convert to USD: 1111 * rate = USD dollars
162 # Convert to cents: USD dollars * 100
163 if IS_TRACE:
164 msg = f"Converting zero-decimal currency {currency} whole units to USD cents"
165 LOG().trace(msg)
167 usd_dollars = float(units) * exchange_rate
168 usd_cents = usd_dollars * 100.0
169 else:
170 # For decimal currencies, input units are cents
171 # Example: 1100 EUR cents = 11.00 EUR
172 # Rate is "1 EUR = X USD", but we have cents
173 # Convert to USD: 1100 cents / rate = USD cents
174 usd_cents = float(units) / exchange_rate
176 if IS_TRACE:
177 msg = (
178 f"Converting {units} {currency} to USD at rate {exchange_rate}:"
179 f" {usd_cents} USD cents before rounding"
180 )
181 LOG().trace(msg)
183 if usd_cents <= 0:
184 usd_cents = 0.0
186 usd_cents = round(usd_cents, 0)
187 return int(usd_cents)
189 def update_exchange_rate(self, rates: ExchangeRateDict | None = None) -> OutputResult:
190 result = AppResult[ExchangeRateWrapper]()
191 start = TimeUtil.get_current_utc_dt()
193 if rates is None:
194 rates = self.get_latest_exchange_rates()
196 if rates is None:
197 msg = "No exchange rates available to update."
198 LOG().error(msg)
199 result.add_error(AppErrorCode.CURRENCY_API, msg)
200 return JobResult.from_result(result, data={"rates": None})
202 db = self.currency_db
203 missing_codes: list[str] = []
205 passed_ct = 0
206 failed_ct = 0
208 if IS_DEBUG:
209 msg = f"Starting exchange rate update with {len(rates)} rates fetched from API\n{rates}\n"
210 LOG().debug(msg)
212 for currency in StripeCurrencyCode:
213 code = currency.code.upper()
214 if code not in rates:
215 LOG().warning(f"Missing exchange rate for currency: {code}")
216 missing_codes.append(code)
217 failed_ct += 1
218 continue
220 rate = rates[code]
221 base_currency = self._base_currency
223 if IS_TRACE:
224 LOG().trace(f"Exchange rate for {currency}[{base_currency}]: {rate}")
226 model = ExchangeRateModel(
227 id=None,
228 base_currency=base_currency,
229 target_currency=currency,
230 rate=rate,
231 )
233 doc_id = db.get_currency_id(base_currency, currency)
234 passed_ct += 1
235 if doc_id is None:
236 db.create(model)
237 if IS_TRACE:
238 msg = f"Created exchange rate for {currency}[{base_currency}] in database."
239 LOG().trace(msg)
240 else:
241 json_data = model.to_dict()
242 db.update(doc_id, json_data)
243 if IS_TRACE:
244 msg = f"Updated exchange rate for {currency}[{base_currency}] in database."
245 LOG().trace(msg)
247 end = TimeUtil.get_current_utc_dt()
248 duration = TimeUtil.duration_in_seconds(start, end)
250 if len(missing_codes) <= 0:
251 msg = f"All required exchange rates updated: passed_ct={passed_ct}, failed_ct={failed_ct}"
252 LOG().info(msg)
253 return OutputResult.ok(
254 job_type=AppJobType.COMMAND_UPDATE_EXCHANGE_RATE,
255 message=msg,
256 duration=duration,
257 )
259 msg = f"Missing exchange rates for currencies: {', '.join(missing_codes)}"
260 self._send_error(msg)
261 result.add_error(AppErrorCode.CURRENCY_API, msg)
262 return JobResult.from_result(result, data={"missing_codes": missing_codes})
264 def get_latest_exchange_rates(self) -> ExchangeRateDict | None:
265 url = self.exchange_rate_url
266 headers = self.api_headers
268 response: requests.Response | None = None
269 error_msg: str | None = None
270 try:
271 response = requests.get(url, headers=headers, timeout=CURRENCY_API_TIMEOUT)
272 except Timeout:
273 error_msg = f"Timeout ({CURRENCY_API_TIMEOUT} secs) fetching exchange rates."
274 except Exception as e:
275 error_msg = f"Error while fetching exchange rates from API: {e}"
277 if response is None:
278 msg = "No response received from exchange rate API"
279 error_msg = f"{msg}. Error: {error_msg}" if error_msg else msg
281 if error_msg is not None:
282 LOG().error(error_msg)
283 self._send_error(error_msg)
284 return None
286 assert response is not None # narrowing
287 if response.status_code != AppHttpCode.OK.status:
288 msg = f"Failed to fetch exchange rates: {response.status_code} - {response.text}"
289 self._send_error(msg)
290 return None
292 data: JsonDict
293 try:
294 data = json.loads(response.text)
295 except Exception as e:
296 msg = f"Error parsing exchange rates response: {e}"
297 self._send_error(msg)
298 return None
300 rates: ExchangeRateDict = data.get("rates", {})
301 if len(rates) == 0:
302 msg = f"No exchange rates found in response (code={response.status_code}): {response.text}"
303 self._send_error(msg)
304 return None
306 LOG().debug(f"Fetched {len(rates)} exchange rates from API.")
307 return rates
309 def _send_error(self, msg: str) -> None:
310 LOG().error(msg)
311 self.admin_mailer.send_error(
312 error_code=AppErrorCode.CURRENCY_API,
313 category=AppLogCategory.API,
314 message=msg,
315 include_stack=False,
316 )