#!/usr/bin/env python # -*- coding:utf-8 -*- import datetime import threading import time import uuid import logging def generate_task_id(): return uuid.uuid1().hex class Timing: def __init__(self, sleep_interval=1): self.task = {} self.lock = threading.Lock() self.sleep_interval = sleep_interval def add_task(self, name, interval, func, count, task_id=generate_task_id(), *args, **kwargs): """ Add a timing task and schedule will execute it. :param task_id: Unique task id. :param name: Task name. :param interval: The interval between the next task execution. :param func: The function of timing task execution. :param count: The total number of times the task is executed. If it is 0, there is no limit. :param args: :param kwargs: :return: """ if not isinstance(task_id, str): raise TypeError('task_id must be str') if not isinstance(interval, int): raise TypeError('interval must be int') if interval < 0: raise ValueError('interval must be bigger than 0') if not callable(func): raise TypeError('func must be func') if not isinstance(count, int): raise TypeError('count must be int') if count < 0: count = 0 self.lock.acquire() self.task[task_id] = {'name': name, 'interval': interval, 'func': func, 'count': count, '__count': 0, 'args': args, 'kwargs': kwargs} self.lock.release() def delete_task(self, task_id): """ Delete the task from schedule by task_id, if exist, return it. :param task_id: Unique task id. :return: """ element = None self.lock.acquire() if self.task.__contains__(task_id): element = self.task.pop(task_id) self.lock.release() return element def set_interval(self, interval): self.sleep_interval = interval def sleep(self): time.sleep(self.sleep_interval) def run(self): while True: clear_keys = [] self.lock.acquire() for task_id in self.task.keys(): task_detail = self.task[task_id] try: now = int(datetime.datetime.now().timestamp() * 1000) interval = task_detail['interval'] if interval - (now % interval) > 1: continue t = threading.Thread(target=task_detail['func'], name=task_detail['name']) t.start() task_detail['__count'] = task_detail['__count'] + 1 if task_detail['__count'] >= task_detail['count']: clear_keys.append(task_id) except Exception as e: logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s', task_id, task_detail['name'], e) self.lock.release() for key in clear_keys: self.delete_task(key) self.sleep() defaultTiming = Timing() def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **kwargs): defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs) def delete_task(task_id): defaultTiming.delete_task(task_id) def set_interval(interval): defaultTiming.set_interval(interval) def run(): defaultTiming.run()