Coverage for functions \ flipdare \ analysis \ data \ nested \ time_series_job_data.py: 98%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from typing import override 

14from datetime import datetime 

15from dataclasses import dataclass 

16from flipdare.analysis.data._time_series_protocol import TimeSeriesPlotInfo 

17from flipdare.analysis.data.nested._time_series_nested_data import TimeSeriesNestedData 

18from flipdare.app_types import AnalysisDataType, ReportListType 

19from flipdare.generated.shared.backend.app_job_type import AppJobType 

20from flipdare.util.time_util import TimeUtil 

21 

22__all__ = ["JobStat", "TimeSeriesJobData"] 

23 

24 

25@dataclass(frozen=True, slots=True) 

26class JobStat: 

27 count: int 

28 error_count: int 

29 

30 def accumulate(self, other: "JobStat") -> "JobStat": 

31 return JobStat( 

32 count=self.count + other.count, 

33 error_count=self.error_count + other.error_count, 

34 ) 

35 

36 

37_EMPTY_STAT = JobStat(0, 0) 

38 

39 

40@dataclass 

41class TimeSeriesJobData(TimeSeriesNestedData[AppJobType, JobStat]): 

42 @property 

43 @override 

44 def headers(self) -> list[str]: 

45 return ["Date", "JobType", "Count", "ErrorCount"] 

46 

47 def add(self, dt: datetime, job_type: AppJobType, stat: JobStat) -> None: 

48 """Accumulate a JobStat for a given date and job type.""" 

49 day = self._to_day(dt) 

50 existing = self._rows[day].get(job_type, _EMPTY_STAT) 

51 self._rows[day][job_type] = existing.accumulate(stat) 

52 

53 @override 

54 def table_data(self) -> ReportListType: 

55 result: ReportListType = [] 

56 for dt in self.dates: 

57 date_str = TimeUtil.formatted_short(dt) 

58 for job_type, stat in sorted(self._rows[dt].items(), key=lambda x: str(x[0])): 

59 result.append([date_str, str(job_type), stat.count, stat.error_count]) 

60 return result 

61 

62 @override 

63 def plot_info(self) -> list[TimeSeriesPlotInfo]: 

64 """Two plots: count per job type and error_count per job type. Each series = one job type.""" 

65 dates = self.dates 

66 date_labels = [TimeUtil.formatted_short(dt) for dt in dates] 

67 job_types: list[AppJobType] = sorted( 

68 {jt for row in self._rows.values() for jt in row}, 

69 key=str, 

70 ) 

71 legend_labels = [str(jt) for jt in job_types] 

72 count_data: AnalysisDataType = [ 

73 [self._rows[dt].get(jt, _EMPTY_STAT).count for dt in dates] for jt in job_types 

74 ] 

75 error_data: AnalysisDataType = [ 

76 [self._rows[dt].get(jt, _EMPTY_STAT).error_count for dt in dates] for jt in job_types 

77 ] 

78 return [ 

79 TimeSeriesPlotInfo( 

80 label="Count", 

81 x_title="Date", 

82 y_title="Count", 

83 x_labels=date_labels, 

84 legend_labels=legend_labels, 

85 data=count_data, 

86 ), 

87 TimeSeriesPlotInfo( 

88 label="Error Count", 

89 x_title="Date", 

90 y_title="Error Count", 

91 x_labels=date_labels, 

92 legend_labels=legend_labels, 

93 data=error_data, 

94 ), 

95 ]