Coverage for functions \ flipdare \ firestore \ backend \ run_config_job_db.py: 93%
42 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from google.cloud.firestore import Client as FirestoreClient
15from flipdare.app_log import LOG
16from flipdare.firestore._app_db import AppDb
17from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField
18from flipdare.generated.model.backend.run_config_job_model import (
19 RunConfigJobKeys,
20 RunConfigJobModel,
21)
22from flipdare.generated.shared.backend.app_job_type import AppJobType
23from flipdare.generated.shared.firestore_collections import FirestoreCollections
24from flipdare.util.time_util import TimeUtil
25from flipdare.wrapper.backend.run_config_job_wrapper import RunConfigJobWrapper
27_RUNTIME_CONFIG: str = FirestoreCollections.RUNTIME_JOB.value
29__all__ = ["RunConfigJobDb"]
31_K = RunConfigJobKeys
32_OP = FieldOp
35class RunConfigJobDb(AppDb[RunConfigJobWrapper, RunConfigJobModel]):
36 def __init__(self, client: FirestoreClient) -> None:
37 super().__init__(
38 client=client,
39 collection_name=FirestoreCollections.RUNTIME_JOB,
40 model_class=RunConfigJobModel,
41 wrapper_class=RunConfigJobWrapper,
42 )
44 def start_job(
45 self,
46 job_type: AppJobType,
47 ) -> RunConfigJobWrapper | None:
48 """Create a running-job marker so other instances know the job is active."""
49 cfg = RunConfigJobModel(
50 id=None,
51 job_group=job_type.job_group,
52 job_type=job_type,
53 started_at=TimeUtil.get_current_utc_float_time(),
54 )
56 try:
57 created_data = self._create(cfg.to_dict())
58 LOG().info(f"Started job '{job_type.job_group.value}/{job_type.value}'...")
59 return RunConfigJobWrapper.from_dict(data=created_data)
60 except Exception as e:
61 self.log_error(
62 message=f"Failed to start job '{job_type.job_group.value}/{job_type.value}': {e}",
63 job_type=job_type,
64 )
65 return None
67 def is_job_running(self, job_type: AppJobType) -> bool:
68 """Return ``True`` if a running-job marker exists for this job."""
69 return self.get_job(job_type) is not None
71 def cancel_job(self, job_type: AppJobType) -> None:
72 """Delete the running-job marker when a job finishes."""
73 LOG().info(f"Cancelling job '{job_type.job_group.value}/{job_type.value}'...")
74 query = DbQuery.and_(
75 [
76 WhereField[_K](_K.JOB_GROUP, _OP.EQUAL, job_type.job_group.value),
77 WhereField[_K](_K.JOB_TYPE, _OP.EQUAL, job_type.value),
78 ],
79 limit=1,
80 )
81 results = query.get_query(self.client, _RUNTIME_CONFIG).get()
82 if results:
83 self.delete(results[0].id)
85 def get_job(
86 self,
87 job_type: AppJobType,
88 ) -> RunConfigJobWrapper | None:
89 """Return the running-job marker for a specific ``(job_group, job_type)`` pair."""
90 job_group = job_type.job_group
91 LOG().debug(f"Getting running-job config for '{job_group.value}/{job_type.value}'...")
92 query = DbQuery.and_(
93 [
94 WhereField[_K](_K.JOB_GROUP, _OP.EQUAL, job_group.value),
95 WhereField[_K](_K.JOB_TYPE, _OP.EQUAL, job_type.value),
96 ],
97 limit=1,
98 )
99 results = query.get_query(self.client, _RUNTIME_CONFIG).get()
100 if not results:
101 LOG().debug(f"No running-job config found for '{job_group.value}/{job_type.value}'")
102 return None
103 return self._cvt_snap_to_model(results[0])