123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import datetime
- import threading
- import time
- import uuid
- import logging
- def generate_task_id():
- return uuid.uuid1().hex
- class Timing:
- def __init__(self, sleep_interval=1):
- self.task = {}
- self.lock = threading.Lock()
- self.sleep_interval = sleep_interval
- def add_task(self, name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
- """
- Add a timing task and schedule will execute it.
- :param task_id: Unique task id.
- :param name: Task name.
- :param interval: The interval between the next task execution.
- :param func: The function of timing task execution.
- :param count: The total number of times the task is executed. If it is 0, there is no limit.
- :param args:
- :param kwargs:
- :return:
- """
- if not isinstance(task_id, str):
- raise TypeError('task_id must be str')
- if not isinstance(interval, int):
- raise TypeError('interval must be int')
- if interval < 0:
- raise ValueError('interval must be bigger than 0')
- if not callable(func):
- raise TypeError('func must be func')
- if not isinstance(count, int):
- raise TypeError('count must be int')
- if count < 0:
- count = 0
- self.lock.acquire()
- self.task[task_id] = {'name': name, 'interval': interval, 'func': func, 'count': count, '__count': 0,
- 'args': args, 'kwargs': kwargs}
- self.lock.release()
- def delete_task(self, task_id):
- """
- Delete the task from schedule by task_id, if exist, return it.
- :param task_id: Unique task id.
- :return:
- """
- element = None
- self.lock.acquire()
- if self.task.__contains__(task_id):
- element = self.task.pop(task_id)
- self.lock.release()
- return element
- def set_interval(self, interval):
- self.sleep_interval = interval
- def sleep(self):
- time.sleep(self.sleep_interval)
- def run(self):
- while True:
- clear_keys = []
- self.lock.acquire()
- for task_id in self.task.keys():
- task_detail = self.task[task_id]
- try:
- now = int(datetime.datetime.now().timestamp() * 1000)
- interval = task_detail['interval']
- if interval - (now % interval) > 1:
- continue
- t = threading.Thread(target=task_detail['func'], name=task_detail['name'])
- t.start()
- task_detail['__count'] = task_detail['__count'] + 1
- if task_detail['__count'] >= task_detail['count']:
- clear_keys.append(task_id)
- except Exception as e:
- logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s',
- task_id, task_detail['name'], e)
- self.lock.release()
- for key in clear_keys:
- self.delete_task(key)
- self.sleep()
- defaultTiming = Timing()
- def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
- defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs)
- def delete_task(task_id):
- defaultTiming.delete_task(task_id)
- def set_interval(interval):
- defaultTiming.set_interval(interval)
- def run():
- defaultTiming.run()
|