Coverage for functions \ flipdare \ task \ command_task_handler.py: 44%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14from flipdare.service._service_provider import ServiceProvider 

15from flipdare.task.command.default_command import DefaultCommand 

16from flipdare.result.output_result import OutputResult 

17from flipdare.generated.shared.backend.app_job_type import AppJobType 

18from flipdare.job_types import CommandJobType 

19 

20__all__ = ["CommandTaskHandler"] 

21 

22 

23class CommandTaskHandler(ServiceProvider): 

24 # NOTE: we need access to everything, so we defy the standard 

25 # NOTE: for a core admin class. 

26 def __init__(self) -> None: 

27 super().__init__() 

28 

29 def run_command(self, job_type: CommandJobType) -> OutputResult: 

30 match job_type: 

31 case AppJobType.COMMAND_TYPESENSE_REINDEX: 

32 return self._typesense_reindex() 

33 case AppJobType.COMMAND_TYPESENSE_COMPACT: 

34 return self._typesense_compact() 

35 case AppJobType.COMMAND_FIREBASE_CLEANUP: 

36 return self._firebase_cleanup() 

37 case AppJobType.COMMAND_UPDATE_EXCHANGE_RATE: 

38 return self._exchange_rate_update() 

39 

40 def _typesense_reindex(self) -> OutputResult: 

41 command = DefaultCommand( 

42 job_type=AppJobType.COMMAND_TYPESENSE_REINDEX, 

43 command_callback=self.search_manager.reindex, 

44 app_logger=self.app_logger, 

45 mailer=self.admin_mailer, 

46 ) 

47 return command.run_command() 

48 

49 def _typesense_compact(self) -> OutputResult: 

50 command = DefaultCommand( 

51 job_type=AppJobType.COMMAND_TYPESENSE_COMPACT, 

52 command_callback=self.search_manager.compact, 

53 app_logger=self.app_logger, 

54 mailer=self.admin_mailer, 

55 ) 

56 return command.run_command() 

57 

58 def _exchange_rate_update(self) -> OutputResult: 

59 command = DefaultCommand( 

60 job_type=AppJobType.COMMAND_UPDATE_EXCHANGE_RATE, 

61 command_callback=self.exchange_rate_monitor.update_exchange_rate, 

62 app_logger=self.app_logger, 

63 mailer=self.admin_mailer, 

64 ) 

65 return command.run_command() 

66 

67 def _firebase_cleanup(self) -> OutputResult: 

68 command = DefaultCommand( 

69 job_type=AppJobType.COMMAND_FIREBASE_CLEANUP, 

70 command_callback=self.storage_client.cleanup, 

71 app_logger=self.app_logger, 

72 mailer=self.admin_mailer, 

73 ) 

74 return command.run_command()