timing.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import datetime
  4. import threading
  5. import time
  6. import uuid
  7. import logging
  8. def generate_task_id():
  9. return uuid.uuid1().hex
  10. class Timing:
  11. def __init__(self, sleep_interval=1):
  12. self.task = {}
  13. self.lock = threading.Lock()
  14. self.sleep_interval = sleep_interval
  15. def add_task(self, name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
  16. """
  17. Add a timing task and schedule will execute it.
  18. :param task_id: Unique task id.
  19. :param name: Task name.
  20. :param interval: The interval between the next task execution.
  21. :param func: The function of timing task execution.
  22. :param count: The total number of times the task is executed. If it is 0, there is no limit.
  23. :param args:
  24. :param kwargs:
  25. :return:
  26. """
  27. if not isinstance(task_id, str):
  28. raise TypeError('task_id must be str')
  29. if not isinstance(interval, int):
  30. raise TypeError('interval must be int')
  31. if interval < 0:
  32. raise ValueError('interval must be bigger than 0')
  33. if not callable(func):
  34. raise TypeError('func must be func')
  35. if not isinstance(count, int):
  36. raise TypeError('count must be int')
  37. if count < 0:
  38. count = 0
  39. self.lock.acquire()
  40. self.task[task_id] = {'name': name, 'interval': interval, 'func': func, 'count': count, '__count': 0,
  41. 'args': args, 'kwargs': kwargs}
  42. self.lock.release()
  43. def delete_task(self, task_id):
  44. """
  45. Delete the task from schedule by task_id, if exist, return it.
  46. :param task_id: Unique task id.
  47. :return:
  48. """
  49. element = None
  50. self.lock.acquire()
  51. if self.task.__contains__(task_id):
  52. element = self.task.pop(task_id)
  53. self.lock.release()
  54. return element
  55. def set_interval(self, interval):
  56. self.sleep_interval = interval
  57. def sleep(self):
  58. time.sleep(self.sleep_interval)
  59. def run(self):
  60. while True:
  61. clear_keys = []
  62. self.lock.acquire()
  63. for task_id in self.task.keys():
  64. task_detail = self.task[task_id]
  65. try:
  66. now = int(datetime.datetime.now().timestamp() * 1000)
  67. interval = task_detail['interval']
  68. if interval - (now % interval) > 1:
  69. continue
  70. t = threading.Thread(target=task_detail['func'], name=task_detail['name'])
  71. t.start()
  72. task_detail['__count'] = task_detail['__count'] + 1
  73. if task_detail['__count'] >= task_detail['count']:
  74. clear_keys.append(task_id)
  75. except Exception as e:
  76. logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s',
  77. task_id, task_detail['name'], e)
  78. self.lock.release()
  79. for key in clear_keys:
  80. self.delete_task(key)
  81. self.sleep()
  82. defaultTiming = Timing()
  83. def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
  84. defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs)
  85. def delete_task(task_id):
  86. defaultTiming.delete_task(task_id)
  87. def set_interval(interval):
  88. defaultTiming.set_interval(interval)
  89. def run():
  90. defaultTiming.run()