Coverage for functions \ flipdare \ service \ user_service.py: 44%
165 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
14from typing import TYPE_CHECKING, Any
16from flipdare.service._service_provider import ServiceProvider
17from flipdare.service.processor.user_processor import UserProcessor
18from flipdare.app_log import LOG
19from flipdare.constants import IS_DEBUG
20from flipdare.result.app_result import AppResult
21from flipdare.core.cron_decorator import cron_decorator
22from flipdare.core.job_type_decorator import job_type_decorator
23from flipdare.result.job_result import JobResult
24from flipdare.core.trigger_decorator import trigger_decorator
25from flipdare.generated import AppErrorCode, AppJobType
26from flipdare.result.outcome import Outcome
27from flipdare.generated.shared.firestore_collections import FirestoreCollections
28from flipdare.wrapper import AppJobWrapper, UserWrapper
29from flipdare.mailer.user.user_summary_email import UserSummaryEmail
30from flipdare.generated.model.backend.metric.count_metric import CountMetric
31from flipdare.service.core.cron_processor import CronConfig, CronProcessor
32from flipdare.app_types import CronResult
33from flipdare.util.time_util import TimeUtil
34from flipdare.wrapper.backend.user_summary_wrapper import UserSummaryWrapper
36if TYPE_CHECKING:
37 from flipdare.manager.db_manager import DbManager
38 from flipdare.manager.backend_manager import BackendManager
40__all__ = ["UserService"]
43class UserService(ServiceProvider):
44 def __init__(
45 self,
46 db_manager: DbManager | None = None,
47 backend_manager: BackendManager | None = None,
48 ) -> None:
49 super().__init__(
50 backend_manager=backend_manager,
51 db_manager=db_manager,
52 )
53 self._user_processor: UserProcessor | None = None
55 @property
56 def user_processor(self) -> UserProcessor:
57 if self._user_processor is None:
58 self._user_processor = UserProcessor(
59 bucket=self.storage_bucket,
60 user_bridge=self.user_bridge,
61 indexer_service=self.indexer,
62 invite_db=self.invite_db,
63 )
64 return self._user_processor
66 @cron_decorator(job_type=AppJobType.CR_USER_UNPROCESSED)
67 def cron_user_unprocessed(self) -> CronResult:
68 user_service = self.user_service
69 user_db = self.user_db
71 config = CronConfig(
72 job_type=AppJobType.CR_USER_UNPROCESSED,
73 job_name="cron_user_unprocessed",
74 query_fn=lambda: [
75 u for u in user_db.get_recent_unprocessed() if not u.processing_complete
76 ],
77 process_fn=lambda user: user_service.user_processor.process_user(
78 user, is_update=False
79 ),
80 )
81 return CronProcessor(config).process_result()
83 @cron_decorator(job_type=AppJobType.CR_USER_DAILY_SUMMARY)
84 def cron_user_summary(self) -> CronResult:
85 summary_db = self.summary_db
86 updated_summaries: list[UserSummaryWrapper] = []
88 main_result = AppResult(task_name="cron_user_summary")
89 failed_ct = 0
90 success_ct = 0
91 start = TimeUtil.get_current_utc_dt()
93 summaries = summary_db.get_user_reports()
94 for summary in summaries:
95 summary_id = summary.doc_id
96 user_id = summary.uid
97 user_result = self.user_bridge.get(user_id)
98 if user_result.is_error:
99 msg = f"No email for userID {user_id},summaryID {summary_id}"
100 main_result.add_error(
101 AppErrorCode.NOT_FOUND,
102 msg,
103 extra=summary.to_json_dict(),
104 )
105 failed_ct += 1
106 continue
108 user = user_result.generated
109 assert user is not None # narrowing
111 debug_str = f"user={user_id}, summary_id={summary_id}"
112 LOG().info(f"Processing daily report for {debug_str}.")
114 result = self._send_daily_summary(summary=summary, user=user)
115 if result.is_ok:
116 success_ct += 1
117 summary.summary_sent = True
118 updated_summaries.append(summary)
119 else:
120 failed_ct += 1
121 continue
123 msg = (
124 f"Succeeded summaries: {success_ct}, Failed summaries: {failed_ct} "
125 f"updates={len(updated_summaries)}"
126 )
127 LOG().info(msg)
128 ct = summary_db.batch_update(updated_summaries)
130 if ct != len(updated_summaries):
131 cause = f"Failed to resolve user summaries:\n{msg}"
132 main_result.add_error(AppErrorCode.USER_SUMMARY, cause)
134 end = TimeUtil.get_current_utc_dt()
135 duration = TimeUtil.duration_in_seconds(start, end)
137 if main_result.is_error:
138 return JobResult.from_result(
139 result=main_result,
140 duration=duration,
141 )
143 return CountMetric(
144 success_ct=success_ct,
145 failed_ct=failed_ct,
146 skipped_ct=0,
147 duration=duration,
148 )
150 def _send_daily_summary(self, summary: UserSummaryWrapper, user: UserWrapper) -> AppResult:
151 summary_db = self.summary_db
152 storage_util = self.storage_util
153 summary_id = summary.doc_id
154 assert summary_id # narrowing
156 user_email = user.email
157 user_tz_str = user.tz_str or "UTC"
159 debug_str = f"user={user_email}, summary_id={summary_id}"
160 result: AppResult = AppResult(
161 doc_id=summary_id,
162 task_name=f"SendDailySummary for {debug_str}",
163 )
165 summary_entries = summary_db.get_report_entries(parent_id=summary_id)
166 if not summary_entries or len(summary_entries) == 0:
167 LOG().info(f"No summary entries found for {debug_str}, resolving.")
168 return result
170 daily_summary = UserSummaryEmail(
171 summary=summary,
172 summaries=UserSummaryEmail.build_summaries(summary_entries),
173 storage_util=storage_util,
174 user_tz_str=user_tz_str,
175 )
177 try:
178 daily_summary.validate()
179 except Exception as ex:
180 msg = f"Daily summary email for {debug_str} is invalid: {ex}"
181 result.add_error(AppErrorCode.EMAIL_TEMPLATE, msg, extra={"exception": str(ex)})
182 return result
184 try:
185 self.user_mailer._send_raw(
186 email=user_email,
187 subject=daily_summary.subject,
188 html_str=daily_summary.render_html(),
189 text_str=daily_summary.render_text(),
190 should_minify=True, # should minify user emails, faster!
191 no_reply=True,
192 )
193 LOG().info(f"Sent daily summary email for {debug_str}.")
194 return result
195 except Exception as ex:
196 msg = f"Failed to send daily summary email to {debug_str}: {ex}"
197 result.add_error(AppErrorCode.SERVER_EX, msg, extra={"exception": str(ex)})
198 return result
200 @job_type_decorator(AppJobType.TR_USER)
201 @trigger_decorator(job_type=AppJobType.TR_USER, collection=FirestoreCollections.USER)
202 def trigger_user(self, job: AppJobWrapper) -> JobResult[Any]:
203 user_id = job.obj_id
204 job_id = job.doc_id
205 is_update = job.has_changes
207 main_result = AppResult[UserWrapper](doc_id=user_id)
208 try:
209 if IS_DEBUG:
210 LOG().debug(f"Processing new user job {job_id} for user ID: {user_id}")
212 result = self.user_bridge.get(user_id)
213 if result.is_error:
214 main_result.merge(result)
215 return JobResult.from_result(
216 result=main_result,
217 doc_id=user_id,
218 data=job.to_json_dict(),
219 )
221 user = result.generated
222 assert user is not None # NO TE: type narrowing
224 invite_result = self._check_user_invited(user, is_update)
225 if invite_result.is_error:
226 # just log a warning and continue
227 cause = f"Failed to process invite for user {user_id}: {invite_result.formatted}"
228 LOG().warning(cause)
229 main_result.add_warning(cause)
231 process_ok = self.user_processor.process_user(user, is_update=is_update)
232 if process_ok.is_error:
233 main_result.add_error(
234 AppErrorCode.CONTENT,
235 f"User processing failed for user ID {user_id}.",
236 extra=user.to_json_dict(),
237 )
238 return JobResult.from_result(
239 result=main_result,
240 doc_id=user_id,
241 data=job.to_json_dict(),
242 )
243 if process_ok.is_skipped:
244 msg = f"User processing skipped for user ID {user_id}."
245 LOG().info(msg)
246 return JobResult.skip_doc(doc_id=user_id, message=msg)
248 return JobResult.ok(
249 message=f"User {user_id} processed successfully.",
250 doc_id=user_id,
251 )
253 except Exception as error:
254 cause = f"Error processing new dare job ID {job_id}: {error}"
255 main_result.add_error(AppErrorCode.UNEXPECTED_CODE_PATH, cause)
256 return JobResult.from_result(
257 result=main_result,
258 doc_id=user_id,
259 data=job.to_json_dict(),
260 )
262 def _check_user_invited(
263 self,
264 user: UserWrapper,
265 is_update: bool,
266 ) -> AppResult[UserWrapper]:
267 user_id = user.doc_id
268 main_result: AppResult[UserWrapper] = AppResult(doc_id=user_id)
270 if user.invite_id is None:
271 msg = f"User ID {user_id} has no inviteId, skipping invite processing."
273 if IS_DEBUG:
274 LOG().debug(msg)
276 return AppResult[UserWrapper].skip(doc_id=user_id, message=msg)
277 if is_update:
278 msg = f"User ID {user_id} is an update, skipping invite processing."
280 if IS_DEBUG:
281 LOG().debug(msg)
283 return AppResult[UserWrapper].ok(doc_id=user_id, message=msg)
285 invite_result = self.invite_bridge.get(user.invite_id)
286 if invite_result.is_error:
287 main_result.merge(invite_result)
288 return main_result
290 invite = invite_result.generated
291 assert invite # narrowing
292 result_value = self.friend_service.trigger_invite_signup(invite)
293 if result_value != Outcome.OK:
294 cause = (
295 f"Failed to process invited user signup for user ID {user_id} "
296 f"with invite ID {user.invite_id}"
297 )
298 main_result.add_error(AppErrorCode.UNEXPECTED_CODE_PATH, cause)
300 if main_result.is_error:
301 return main_result
303 return AppResult[UserWrapper].ok(
304 doc_id=user_id,
305 message="User invite processed successfully.",
306 )