Coverage for functions \ flipdare \ service \ processor \ content_processor.py: 27%
82 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from pathlib import Path
15from typing import TYPE_CHECKING
17from google.cloud.storage.bucket import Bucket as StorageBucket # type: ignore
18from flipdare.service.processor._processor_mixin import ProcessorMixin
19from flipdare.app_globals import truncate_string
20from flipdare.app_log import LOG
21from flipdare.app_types import ContentBridge
22from flipdare.constants import DOWNLOAD_FILE_DIR
23from flipdare.result.app_result import AppResult
24from flipdare.result.job_result import JobResult
25from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
26from flipdare.firestore.db_bridge import DbBridge
27from flipdare.generated.model.content_model import ContentKeys
28from flipdare.generated.shared.app_error_code import AppErrorCode
29from flipdare.wrapper import ContentWrapper
31if TYPE_CHECKING:
33 from flipdare.backend.indexer_service import IndexerService
34 from flipdare.firestore.content_db import ContentDb
37_K = ContentKeys
40class ContentProcessor(ProcessorMixin):
42 def __init__(
43 self,
44 bucket: StorageBucket,
45 content_db: "ContentDb",
46 indexer_service: "IndexerService",
47 local_path: Path = DOWNLOAD_FILE_DIR,
48 ) -> None:
49 super().__init__(bucket=bucket, local_path=local_path)
50 self._content_db = content_db
51 self._content_bridge: ContentBridge = DbBridge(content_db, "Content")
52 self._local_path = local_path
53 self._indexer_service = indexer_service
55 def process_content(
56 self, content: ContentWrapper, is_update: bool
57 ) -> JobResult[ContentWrapper]:
58 content_id = content.doc_id
59 # Check if already complete
60 if content.processing_complete:
61 msg = f"Content already processed for {content_id}"
62 LOG().info(msg)
63 return JobResult.skip_doc(doc_id=content_id, message=msg)
65 # Use StepProcessor for the workflow
66 steps: list[ProcessingStep[_K, ContentWrapper]] = []
67 if content.video is not None:
68 steps.extend(
69 [
70 ProcessingStep[_K, ContentWrapper](
71 state_key=_K.THUMBNAIL_CREATED,
72 handler=lambda m: self._optimize_video_step(m),
73 description="Generate thumbnail",
74 required=True,
75 ),
76 ProcessingStep[_K, ContentWrapper](
77 state_key=_K.OPTIMIZED_VIDEO,
78 handler=lambda m: self._optimize_video_step(m),
79 description="Optimize video",
80 required=True,
81 ),
82 ],
83 )
85 # this needs to be done for both image/video content
86 steps.append(
87 ProcessingStep[_K, ContentWrapper](
88 state_key=_K.HASH_CREATED,
89 handler=lambda m: self._create_thumbnail_hash(m),
90 description="Generate thumbnail hash",
91 required=True,
92 ),
93 )
95 steps.append(
96 ProcessingStep[_K, ContentWrapper](
97 state_key=_K.SEARCH_INDEXED,
98 handler=lambda m: self._index_content_in_search(m, is_update=is_update),
99 description="Index in search",
100 required=True,
101 ),
102 )
104 processor = StepProcessor(
105 wrapper=content,
106 steps=steps,
107 save_handler=lambda m: self._content_bridge.update(m),
108 process_name=f"process_content_{content_id}",
109 )
111 result = processor.execute()
112 if result.is_error:
113 return JobResult.from_result(
114 result,
115 doc_id=content_id,
116 data=content.to_json_dict(),
117 )
118 return JobResult.ok(doc_id=content_id)
120 def _create_thumbnail_hash(self, content: ContentWrapper) -> AppResult[ContentWrapper]:
121 """Generate thumbnail hash for ContentModel avatar image."""
122 doc_id = content.doc_id
123 assert doc_id # narrowing
124 main_result = AppResult[ContentWrapper](
125 task_name=f"CreateThumbnailHash for Content {doc_id}",
126 )
128 image = content.image
129 if image is None:
130 # thi can happen if no avatar set
131 msg = f"No avatar image for ContentModel: {doc_id}"
132 return AppResult[ContentWrapper].skip(doc_id=doc_id, message=msg)
133 # Generate hash
135 LOG().debug(f"Generating hash for thumbnail of ContentModel: {doc_id}")
136 hash_code = self.get_image_hash(image, width=image.w, height=image.h)
137 if hash_code is None:
138 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail")
139 return main_result
141 msg = f"Generated thumbnail hash for ContentModel: {doc_id} hash: {truncate_string(hash_code)}"
142 LOG().debug(msg)
144 image.blur_hash = hash_code
145 content.image = image
146 main_result.generated = content
147 return main_result
149 def _optimize_video_step(self, content: ContentWrapper) -> AppResult[ContentWrapper]:
150 """Wrapper for optimize_video to return AppResult."""
151 doc_id = content.doc_id
152 assert doc_id # narrowing
153 main_result = AppResult[ContentWrapper](task_name=f"OptimizeVideo for Content {doc_id}")
155 video = content.model.video
156 if video is None:
157 main_result.add_error(
158 AppErrorCode.MISSING_DATA,
159 f"No video for ContentModel: {doc_id}",
160 )
161 return main_result
163 optimized = self.optimize_video(video)
164 if optimized is None:
165 main_result.add_error(AppErrorCode.CREATE_FAILED, "Failed to optimize video")
166 return main_result
168 # Update the video model with optimized version
169 video.low = optimized
170 content.model.video = video
171 main_result.generated = content
172 return main_result
174 def _index_content_in_search(
175 self,
176 content: ContentWrapper,
177 is_update: bool,
178 ) -> AppResult[ContentWrapper]:
179 """Add content to search index (or remove if private)."""
180 main_result = AppResult[ContentWrapper](task_name="IndexContentInSearch")
182 # NOTE: we pass to index irrespective of whether the dare is private,
183 # NOTE: because it may have changed from public to private or vice versa.
184 # NOTE: and will need to be deleted.
185 index_result = self._indexer_service.process_content(content, is_update=is_update)
186 if index_result.is_error:
187 main_result.merge(index_result)
188 return main_result
190 main_result.generated = content
191 return main_result