Coverage for functions \ flipdare \ firestore \ backend \ run_config_group_db.py: 84%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from google.cloud.firestore import Client as FirestoreClient 

15from flipdare.app_log import LOG 

16from flipdare.firestore._app_db import AppDb 

17from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField 

18from flipdare.generated.model.backend.run_config_group_model import ( 

19 RunConfigGroupKeys, 

20 RunConfigGroupModel, 

21) 

22from flipdare.generated.shared.backend.app_job_group import AppJobGroup 

23from flipdare.generated.shared.backend.app_job_type import AppJobType 

24from flipdare.generated.shared.firestore_collections import FirestoreCollections 

25from flipdare.wrapper.backend.run_config_group_wrapper import RunConfigGroupWrapper 

26 

27__all__ = ["RunConfigGroupDb"] 

28 

29_RUNTIME_GROUP: str = FirestoreCollections.RUNTIME_GROUP.value 

30_K = RunConfigGroupKeys 

31_OP = FieldOp 

32 

33 

34class RunConfigGroupDb(AppDb[RunConfigGroupWrapper, RunConfigGroupModel]): 

35 def __init__(self, client: FirestoreClient) -> None: 

36 super().__init__( 

37 client=client, 

38 collection_name=FirestoreCollections.RUNTIME_GROUP, 

39 model_class=RunConfigGroupModel, 

40 wrapper_class=RunConfigGroupWrapper, 

41 ) 

42 

43 def enable(self, job_group: AppJobGroup) -> None: 

44 """Enable all jobs in *job_group*.""" 

45 LOG().info(f"Enabling group '{job_group.value}'...") 

46 self._set_group_config(job_group=job_group, enabled=True) 

47 

48 def disable(self, job_group: AppJobGroup) -> None: 

49 """Disable all jobs in *job_group*.""" 

50 LOG().info(f"Disabling group '{job_group.value}'...") 

51 self._set_group_config(job_group=job_group, enabled=False) 

52 

53 def get_group_config(self, job_group: AppJobGroup) -> RunConfigGroupWrapper | None: 

54 """Return the group-level enable/disable config, or ``None`` if not yet set.""" 

55 doc_id = job_group.value 

56 data = self._get(doc_id) 

57 if data is None: 

58 return None 

59 return RunConfigGroupWrapper.from_dict(data=data) 

60 

61 def get_config( 

62 self, 

63 job_group: AppJobGroup, 

64 job_type: AppJobType, 

65 ) -> RunConfigGroupWrapper | None: 

66 """Return the running-job marker for a specific ``(job_group, job_type)`` pair.""" 

67 LOG().debug(f"Getting running-job config for '{job_group.value}/{job_type.value}'...") 

68 query = DbQuery.and_( 

69 [ 

70 WhereField[_K](_K.JOB_GROUP, _OP.EQUAL, job_group.value), 

71 WhereField[_K](_K.JOB_TYPE, _OP.EQUAL, job_type.value), 

72 ], 

73 limit=1, 

74 ) 

75 results = query.get_query(self.client, _RUNTIME_GROUP).get() 

76 if not results: 

77 LOG().debug(f"No running-job config found for '{job_group.value}/{job_type.value}'") 

78 return None 

79 return self._cvt_snap_to_model(results[0]) 

80 

81 def _set_group_config(self, job_group: AppJobGroup, enabled: bool) -> RunConfigGroupWrapper: 

82 """Upsert the group-level config using a predictable document ID.""" 

83 doc_id = job_group.value 

84 model = RunConfigGroupModel( 

85 id=doc_id, 

86 enabled=enabled, 

87 job_group=job_group, 

88 ) 

89 data = model.to_dict() 

90 data["id"] = doc_id # Ensure the ID is included in the data for the wrapper 

91 

92 cfg = RunConfigGroupWrapper.from_dict(data=data) 

93 

94 payload = cfg.to_dict() 

95 self.client.collection(_RUNTIME_GROUP).document(doc_id).set(payload) 

96 return RunConfigGroupWrapper.from_dict(data={"id": doc_id, **payload})