|
@@ -14,6 +14,7 @@ def generate_task_id():
|
|
|
class Timing:
|
|
|
def __init__(self, sleep_interval=1):
|
|
|
self.task = {}
|
|
|
+ self.is_running = False
|
|
|
self.lock = threading.Lock()
|
|
|
self.sleep_interval = sleep_interval
|
|
|
|
|
@@ -43,22 +44,27 @@ class Timing:
|
|
|
count = 0
|
|
|
|
|
|
self.lock.acquire()
|
|
|
- self.task[task_id] = {'name': name, 'interval': interval, 'func': func, 'count': count, '__count': 0,
|
|
|
- 'args': args, 'kwargs': kwargs}
|
|
|
+ # exec_num: It is times which task has executed.
|
|
|
+ task = {'name': name, 'interval': interval, 'func': func, 'exec_num': 0,
|
|
|
+ 'args': args, 'kwargs': kwargs}
|
|
|
+ if count > 0:
|
|
|
+ task['count'] = count
|
|
|
+
|
|
|
+ self.task[task_id] = task
|
|
|
self.lock.release()
|
|
|
|
|
|
- def delete_task(self, task_id):
|
|
|
+ def delete_tasks(self, task_ids):
|
|
|
"""
|
|
|
- Delete the task from schedule by task_id, if exist, return it.
|
|
|
- :param task_id: Unique task id.
|
|
|
+ Delete the task from schedule by multi task_ids, if exist, return these.
|
|
|
+ :param task_ids: multi task id.
|
|
|
:return:
|
|
|
"""
|
|
|
- element = None
|
|
|
- self.lock.acquire()
|
|
|
- if self.task.__contains__(task_id):
|
|
|
- element = self.task.pop(task_id)
|
|
|
- self.lock.release()
|
|
|
- return element
|
|
|
+ elements = []
|
|
|
+ for key in task_ids:
|
|
|
+ if self.task.__contains__(key):
|
|
|
+ element = self.task.pop(key)
|
|
|
+ elements.append(element)
|
|
|
+ return elements
|
|
|
|
|
|
def set_interval(self, interval):
|
|
|
self.sleep_interval = interval
|
|
@@ -66,30 +72,44 @@ class Timing:
|
|
|
def sleep(self):
|
|
|
time.sleep(self.sleep_interval)
|
|
|
|
|
|
+ def stop(self):
|
|
|
+ self.is_running = False
|
|
|
+
|
|
|
def run(self):
|
|
|
+ self.is_running = True
|
|
|
while True:
|
|
|
+ if not self.is_running:
|
|
|
+ logging.debug('[TIMING] timing server will end.')
|
|
|
+ break
|
|
|
+
|
|
|
+ logging.debug('[TIMING] run with tasks length: %s', len(self.task))
|
|
|
clear_keys = []
|
|
|
self.lock.acquire()
|
|
|
- for task_id in self.task.keys():
|
|
|
- task_detail = self.task[task_id]
|
|
|
+ tasks = self.task
|
|
|
+ self.lock.release()
|
|
|
+
|
|
|
+ for task_id in tasks.keys():
|
|
|
+ task_detail = tasks[task_id]
|
|
|
try:
|
|
|
now = int(datetime.datetime.now().timestamp() * 1000)
|
|
|
interval = task_detail['interval']
|
|
|
if interval - (now % interval) > 1:
|
|
|
continue
|
|
|
t = threading.Thread(target=task_detail['func'], name=task_detail['name'])
|
|
|
+ t.setDaemon(True)
|
|
|
t.start()
|
|
|
|
|
|
- task_detail['__count'] = task_detail['__count'] + 1
|
|
|
- if task_detail['__count'] >= task_detail['count']:
|
|
|
+ task_detail['exec_num'] = task_detail['exec_num'] + 1
|
|
|
+ if task_detail.__contains__('count') and task_detail['exec_num'] >= task_detail['count']:
|
|
|
clear_keys.append(task_id)
|
|
|
except Exception as e:
|
|
|
logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s',
|
|
|
task_id, task_detail['name'], e)
|
|
|
- self.lock.release()
|
|
|
|
|
|
- for key in clear_keys:
|
|
|
- self.delete_task(key)
|
|
|
+ # clear past due keys
|
|
|
+ self.lock.acquire()
|
|
|
+ self.delete_tasks(clear_keys)
|
|
|
+ self.lock.release()
|
|
|
self.sleep()
|
|
|
|
|
|
|
|
@@ -100,8 +120,8 @@ def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **k
|
|
|
defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs)
|
|
|
|
|
|
|
|
|
-def delete_task(task_id):
|
|
|
- defaultTiming.delete_task(task_id)
|
|
|
+def delete_tasks(task_ids):
|
|
|
+ defaultTiming.delete_tasks(task_ids)
|
|
|
|
|
|
|
|
|
def set_interval(interval):
|
|
@@ -110,3 +130,7 @@ def set_interval(interval):
|
|
|
|
|
|
def run():
|
|
|
defaultTiming.run()
|
|
|
+
|
|
|
+
|
|
|
+def stop():
|
|
|
+ defaultTiming.stop()
|