Coverage for functions \ flipdare \ service \ processor \ group_member_processor.py: 32%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from pathlib import Path 

15from typing import TYPE_CHECKING, Any 

16 

17from google.cloud.storage.bucket import Bucket as StorageBucket 

18from flipdare.service.notification_service import NotificationService 

19from flipdare.service.processor._processor_mixin import ProcessorMixin 

20from flipdare.app_log import LOG 

21from flipdare.constants import DOWNLOAD_FILE_DIR, NO_DOC_ID 

22from flipdare.result.app_result import AppResult 

23from flipdare.service.core.step_processor import ProcessingStep, StepProcessor 

24from flipdare.firestore.context.group_member_context import ( 

25 GroupMemberContext, 

26 GroupMemberContextFactory, 

27) 

28from flipdare.firestore.group_db import GroupDb 

29from flipdare.generated import AppErrorCode 

30from flipdare.generated.model.group_member_model import GroupMemberKeys 

31from flipdare.wrapper import GroupMemberWrapper 

32 

33if TYPE_CHECKING: 

34 

35 from flipdare.backend.indexer_service import IndexerService 

36 

37_M = GroupMemberKeys 

38 

39 

40class GroupMemberProcessor(ProcessorMixin): 

41 

42 def __init__( 

43 self, 

44 bucket: StorageBucket, 

45 group_db: GroupDb, 

46 indexer_service: "IndexerService", 

47 notification_service: NotificationService, 

48 local_path: Path = DOWNLOAD_FILE_DIR, 

49 ) -> None: 

50 super().__init__(bucket=bucket, local_path=local_path) 

51 self._indexer_service = indexer_service 

52 self._group_db = group_db 

53 self._notification_service = notification_service 

54 

55 @property 

56 def indexer_service(self) -> "IndexerService": 

57 return self._indexer_service 

58 

59 @property 

60 def group_db(self) -> GroupDb: 

61 return self._group_db 

62 

63 def _update_member( 

64 self, 

65 parent_id: str, 

66 member: GroupMemberWrapper, 

67 ) -> AppResult[GroupMemberWrapper]: 

68 """Save updated member to the database.""" 

69 main_result = AppResult[GroupMemberWrapper]( 

70 task_name=f"UpdateGroupMember for {member.doc_id or NO_DOC_ID}", 

71 ) 

72 

73 try: 

74 self.group_db.update_member( 

75 group_id=parent_id, 

76 member_id=member.doc_id, 

77 member_data=member, 

78 ) 

79 main_result.generated = member 

80 except Exception as e: 

81 msg = f"Exception updating GroupMember {member.doc_id}: {e}" 

82 main_result.add_error(AppErrorCode.DATABASE_EX, msg, extra=member.to_json_dict()) 

83 

84 return main_result 

85 

86 def process_group_member( 

87 self, 

88 group_member: GroupMemberWrapper, 

89 ) -> AppResult[GroupMemberWrapper]: 

90 group_member_id = group_member.doc_id 

91 main_result = AppResult[GroupMemberWrapper]( 

92 task_name=f"ProcessGroupMember for {group_member_id or NO_DOC_ID}", 

93 ) 

94 

95 # Check if already complete 

96 if group_member.processing_complete: 

97 msg = f"GroupMember already processed for {group_member_id}" 

98 LOG().info(msg) 

99 return AppResult[GroupMemberWrapper].ok(doc_id=group_member_id, message=msg) 

100 

101 # Use StepProcessor for the workflow 

102 processor = self._member_processor(group_member) 

103 if processor is None: 

104 main_result.add_error( 

105 AppErrorCode.PROCESSING_STEP, 

106 f"Failed to build processor for GroupMember {group_member_id}", 

107 ) 

108 return main_result 

109 

110 result = processor.execute() 

111 if result.is_error: 

112 main_result.merge(result) 

113 

114 return main_result 

115 

116 def _member_processor( 

117 self, 

118 member: GroupMemberWrapper, 

119 ) -> StepProcessor[GroupMemberWrapper] | None: 

120 member_id = member.doc_id 

121 steps = [ 

122 ProcessingStep[_M, GroupMemberWrapper]( 

123 state_key=_M.REQUEST_NOTIFICATION_SENT, 

124 handler=lambda m: self._send_notif_step(m, is_request=True), 

125 description="Add request notification", 

126 required=True, 

127 ), 

128 ProcessingStep[_M, GroupMemberWrapper]( 

129 state_key=_M.STATUS_NOTIFICATION_SENT, 

130 handler=lambda m: self._send_notif_step(m, is_request=False), 

131 description="Add status notification", 

132 required=True, 

133 ), 

134 ] 

135 

136 return StepProcessor( 

137 wrapper=member, 

138 steps=steps, 

139 save_handler=lambda m: self._update_member(parent_id=member_id, member=m), 

140 process_name=f"process_member_{member_id}", 

141 ) 

142 

143 def _send_notif_step( 

144 self, 

145 group_member: GroupMemberWrapper, 

146 is_request: bool, 

147 ) -> AppResult[Any]: 

148 """Retrieve context information for a group member.""" 

149 main_result = AppResult[Any]( 

150 task_name=f"GetGroupContext for {group_member.doc_id or NO_DOC_ID}", 

151 ) 

152 

153 context_result = self._get_member_context(group_member) 

154 if context_result.is_error: 

155 main_result.merge(context_result) 

156 return main_result 

157 

158 context = context_result.generated 

159 assert context 

160 

161 try: 

162 self._notification_service.send_group_notif(context, is_request=is_request) 

163 except Exception as e: 

164 msg = f"Exception sending notification for member {group_member.doc_id}: {e}" 

165 main_result.add_error(AppErrorCode.NOTIFICATION, msg) 

166 

167 return main_result 

168 

169 def _get_member_context(self, member: GroupMemberWrapper) -> AppResult[GroupMemberContext]: 

170 """Create FriendContext from FriendModel.""" 

171 doc_id = member.doc_id 

172 main_result = AppResult[GroupMemberContext](doc_id=doc_id) 

173 

174 try: 

175 member_context = GroupMemberContextFactory().create(member) 

176 if member_context is not None: 

177 main_result.generated = member_context 

178 else: 

179 cause = f"Failed to build GroupMemberContext for {member.doc_id}" 

180 main_result.add_error( 

181 AppErrorCode.CONTEXT, 

182 cause, 

183 extra=member.to_json_dict(), 

184 ) 

185 except Exception as e: 

186 cause = f"Exception creating GroupMemberContext for {member.doc_id}: {e}" 

187 main_result.add_error(AppErrorCode.CONTEXT, cause, extra=member.to_json_dict()) 

188 

189 return main_result