Coverage for functions \ flipdare \ firestore \ content_db.py: 91%

33 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

2# 

3# This file is part of Flipdare's proprietary software and contains 

4# confidential and copyrighted material. Unauthorised copying, 

5# modification, distribution, or use of this file is strictly 

6# prohibited without prior written permission from Flipdare Pty Ltd. 

7# 

8# This software includes third-party components licensed under MIT, 

9# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

10# 

11 

12from google.cloud.firestore import Client as FirestoreClient 

13from flipdare.app_log import LOG 

14from flipdare.error.app_error import DatabaseError 

15from flipdare.firestore._app_db import AppDb 

16from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField 

17from flipdare.generated.model.content_model import ContentInternalKeys, ContentModel 

18from flipdare.generated.shared.app_error_code import AppErrorCode 

19from flipdare.generated.shared.firestore_collections import FirestoreCollections 

20from flipdare.util.time_util import TimeUtil 

21from flipdare.wrapper import ContentWrapper 

22 

23__all__ = ["ContentDb"] 

24 

25_CONTENT = FirestoreCollections.CONTENT.value 

26 

27_I = ContentInternalKeys 

28_OP = FieldOp 

29 

30 

31class ContentDb(AppDb[ContentWrapper, ContentModel]): 

32 """Class for managing user profile content-related database operations.""" 

33 

34 def __init__(self, client: FirestoreClient) -> None: 

35 super().__init__( 

36 client=client, 

37 collection_name=FirestoreCollections.CONTENT, 

38 wrapper_class=ContentWrapper, 

39 model_class=ContentModel, 

40 ) 

41 

42 def get_recent_unprocessed(self, hours: int | None = None) -> list[ContentWrapper]: 

43 if hours is None: 

44 hours = self.def_window_hours 

45 

46 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

47 msg = ( 

48 f"Getting unprocessed content within the last {TimeUtil.formatted_dt(hours_ago)} hours" 

49 ) 

50 LOG().debug(msg) 

51 

52 try: 

53 and_fields = [ 

54 WhereField[_I](_I.PROCESSED, _OP.EQUAL, False), 

55 WhereField[_I](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago), 

56 ] 

57 query = DbQuery.and_(and_fields) 

58 

59 content_docs = query.get_query(self.client, _CONTENT).stream() 

60 contents = [ 

61 ContentWrapper.from_dict(data=data) 

62 for doc in content_docs 

63 if (data := self._cvt_snap_to_data(doc)) is not None 

64 ] 

65 

66 LOG().debug(f"Retrieved {len(contents)} content items waiting for admin review.") 

67 return contents 

68 except Exception as e: 

69 msg = f"Failed to get content items waiting for admin review: {e}" 

70 raise DatabaseError( 

71 msg, 

72 error_code=AppErrorCode.DATABASE, 

73 collection_name=_CONTENT, 

74 ) from e