Coverage for functions \ flipdare \ task \ report \ flag_reporter.py: 93%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING 

16from flipdare.service._service_provider import ServiceProvider 

17from flipdare.task.report.core.query_report import QueryReport 

18from flipdare.core.app_backend_link import AppBackendLink 

19from flipdare.result.output_result import OutputResult 

20from flipdare.generated.schema.report.flag_report_schema import FlagReportSchema 

21from flipdare.generated.shared.backend.app_job_type import AppJobType 

22from flipdare.util.time_util import FirestoreTime 

23from flipdare.wrapper import FlagWrapper 

24 

25if TYPE_CHECKING: 

26 from flipdare.manager.db_manager import DbManager 

27 from flipdare.manager.backend_manager import BackendManager 

28 

29__all__ = ["FlagReporter"] 

30 

31 

32class FlagReporter(ServiceProvider): 

33 def __init__( 

34 self, 

35 db_manager: DbManager, 

36 backend_manager: BackendManager, 

37 ) -> None: 

38 super().__init__( 

39 db_manager=db_manager, 

40 backend_manager=backend_manager, 

41 ) 

42 

43 def unacknowledged(self) -> OutputResult: 

44 report = QueryReport[FlagReportSchema]( 

45 job_type=AppJobType.REPORT_FLAG_UNACKNOWLEDGED, 

46 schema_cls=FlagReportSchema, 

47 query_fn=lambda: self.flag_db.get_recent_major_unacknowledged(), 

48 process_fn=self._flag_entry_processor, 

49 app_logger=self.app_logger, 

50 mailer=self.admin_mailer, 

51 ) 

52 

53 return report.create_and_send() 

54 

55 def waiting_disputed(self) -> OutputResult: 

56 report = QueryReport[FlagReportSchema]( 

57 job_type=AppJobType.REPORT_FLAG_DISPUTED_WAITING_ADMIN, 

58 schema_cls=FlagReportSchema, 

59 query_fn=lambda: self.flag_db.get_recent_waiting_disputed_service(), 

60 process_fn=self._flag_entry_processor, 

61 app_logger=self.app_logger, 

62 mailer=self.admin_mailer, 

63 ) 

64 

65 return report.create_and_send() 

66 

67 def _flag_entry_processor(self, flag: FlagWrapper) -> FlagReportSchema: 

68 flag_id = flag.doc_id 

69 backend_link = AppBackendLink.FLAG.link(flag_id) if flag_id else "N/A" 

70 created_at = FirestoreTime.internal_str(flag.created_at_db) 

71 updated_at = FirestoreTime.internal_str(flag.updated_at_db) 

72 backend_obj_link = AppBackendLink.FLAG.link(flag.obj_id) if flag.obj_id else "N/A" 

73 disputed_progress = flag.disputed_progress.value if flag.disputed_progress else "N/A" 

74 

75 return FlagReportSchema( 

76 { 

77 "from_uid": flag.from_uid, 

78 "to_uid": flag.to_uid, 

79 "in_danger": str(flag.user_in_danger), 

80 "flag_type": flag.flag_type.value, 

81 "progress": flag.progress.value, 

82 "disputed_progress": disputed_progress, 

83 "description": flag.description, 

84 "backend_link": backend_link, 

85 "obj_id": flag.obj_id, 

86 "backend_obj_link": backend_obj_link, 

87 "created_at": created_at, 

88 "updated_at": updated_at, 

89 }, 

90 )