Coverage for functions \ flipdare \ service \ processor \ content_processor.py: 27%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from pathlib import Path 

15from typing import TYPE_CHECKING 

16 

17from google.cloud.storage.bucket import Bucket as StorageBucket # type: ignore 

18from flipdare.service.processor._processor_mixin import ProcessorMixin 

19from flipdare.app_globals import truncate_string 

20from flipdare.app_log import LOG 

21from flipdare.app_types import ContentBridge 

22from flipdare.constants import DOWNLOAD_FILE_DIR 

23from flipdare.result.app_result import AppResult 

24from flipdare.result.job_result import JobResult 

25from flipdare.service.core.step_processor import ProcessingStep, StepProcessor 

26from flipdare.firestore.db_bridge import DbBridge 

27from flipdare.generated.model.content_model import ContentKeys 

28from flipdare.generated.shared.app_error_code import AppErrorCode 

29from flipdare.wrapper import ContentWrapper 

30 

31if TYPE_CHECKING: 

32 

33 from flipdare.backend.indexer_service import IndexerService 

34 from flipdare.firestore.content_db import ContentDb 

35 

36 

37_K = ContentKeys 

38 

39 

40class ContentProcessor(ProcessorMixin): 

41 

42 def __init__( 

43 self, 

44 bucket: StorageBucket, 

45 content_db: "ContentDb", 

46 indexer_service: "IndexerService", 

47 local_path: Path = DOWNLOAD_FILE_DIR, 

48 ) -> None: 

49 super().__init__(bucket=bucket, local_path=local_path) 

50 self._content_db = content_db 

51 self._content_bridge: ContentBridge = DbBridge(content_db, "Content") 

52 self._local_path = local_path 

53 self._indexer_service = indexer_service 

54 

55 def process_content( 

56 self, content: ContentWrapper, is_update: bool 

57 ) -> JobResult[ContentWrapper]: 

58 content_id = content.doc_id 

59 # Check if already complete 

60 if content.processing_complete: 

61 msg = f"Content already processed for {content_id}" 

62 LOG().info(msg) 

63 return JobResult.skip_doc(doc_id=content_id, message=msg) 

64 

65 # Use StepProcessor for the workflow 

66 steps: list[ProcessingStep[_K, ContentWrapper]] = [] 

67 if content.video is not None: 

68 steps.extend( 

69 [ 

70 ProcessingStep[_K, ContentWrapper]( 

71 state_key=_K.THUMBNAIL_CREATED, 

72 handler=lambda m: self._optimize_video_step(m), 

73 description="Generate thumbnail", 

74 required=True, 

75 ), 

76 ProcessingStep[_K, ContentWrapper]( 

77 state_key=_K.OPTIMIZED_VIDEO, 

78 handler=lambda m: self._optimize_video_step(m), 

79 description="Optimize video", 

80 required=True, 

81 ), 

82 ], 

83 ) 

84 

85 # this needs to be done for both image/video content 

86 steps.append( 

87 ProcessingStep[_K, ContentWrapper]( 

88 state_key=_K.HASH_CREATED, 

89 handler=lambda m: self._create_thumbnail_hash(m), 

90 description="Generate thumbnail hash", 

91 required=True, 

92 ), 

93 ) 

94 

95 steps.append( 

96 ProcessingStep[_K, ContentWrapper]( 

97 state_key=_K.SEARCH_INDEXED, 

98 handler=lambda m: self._index_content_in_search(m, is_update=is_update), 

99 description="Index in search", 

100 required=True, 

101 ), 

102 ) 

103 

104 processor = StepProcessor( 

105 wrapper=content, 

106 steps=steps, 

107 save_handler=lambda m: self._content_bridge.update(m), 

108 process_name=f"process_content_{content_id}", 

109 ) 

110 

111 result = processor.execute() 

112 if result.is_error: 

113 return JobResult.from_result( 

114 result, 

115 doc_id=content_id, 

116 data=content.to_json_dict(), 

117 ) 

118 return JobResult.ok(doc_id=content_id) 

119 

120 def _create_thumbnail_hash(self, content: ContentWrapper) -> AppResult[ContentWrapper]: 

121 """Generate thumbnail hash for ContentModel avatar image.""" 

122 doc_id = content.doc_id 

123 assert doc_id # narrowing 

124 main_result = AppResult[ContentWrapper]( 

125 task_name=f"CreateThumbnailHash for Content {doc_id}", 

126 ) 

127 

128 image = content.image 

129 if image is None: 

130 # thi can happen if no avatar set 

131 msg = f"No avatar image for ContentModel: {doc_id}" 

132 return AppResult[ContentWrapper].skip(doc_id=doc_id, message=msg) 

133 # Generate hash 

134 

135 LOG().debug(f"Generating hash for thumbnail of ContentModel: {doc_id}") 

136 hash_code = self.get_image_hash(image, width=image.w, height=image.h) 

137 if hash_code is None: 

138 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail") 

139 return main_result 

140 

141 msg = f"Generated thumbnail hash for ContentModel: {doc_id} hash: {truncate_string(hash_code)}" 

142 LOG().debug(msg) 

143 

144 image.blur_hash = hash_code 

145 content.image = image 

146 main_result.generated = content 

147 return main_result 

148 

149 def _optimize_video_step(self, content: ContentWrapper) -> AppResult[ContentWrapper]: 

150 """Wrapper for optimize_video to return AppResult.""" 

151 doc_id = content.doc_id 

152 assert doc_id # narrowing 

153 main_result = AppResult[ContentWrapper](task_name=f"OptimizeVideo for Content {doc_id}") 

154 

155 video = content.model.video 

156 if video is None: 

157 main_result.add_error( 

158 AppErrorCode.MISSING_DATA, 

159 f"No video for ContentModel: {doc_id}", 

160 ) 

161 return main_result 

162 

163 optimized = self.optimize_video(video) 

164 if optimized is None: 

165 main_result.add_error(AppErrorCode.CREATE_FAILED, "Failed to optimize video") 

166 return main_result 

167 

168 # Update the video model with optimized version 

169 video.low = optimized 

170 content.model.video = video 

171 main_result.generated = content 

172 return main_result 

173 

174 def _index_content_in_search( 

175 self, 

176 content: ContentWrapper, 

177 is_update: bool, 

178 ) -> AppResult[ContentWrapper]: 

179 """Add content to search index (or remove if private).""" 

180 main_result = AppResult[ContentWrapper](task_name="IndexContentInSearch") 

181 

182 # NOTE: we pass to index irrespective of whether the dare is private, 

183 # NOTE: because it may have changed from public to private or vice versa. 

184 # NOTE: and will need to be deleted. 

185 index_result = self._indexer_service.process_content(content, is_update=is_update) 

186 if index_result.is_error: 

187 main_result.merge(index_result) 

188 return main_result 

189 

190 main_result.generated = content 

191 return main_result