#!/usr/bin/env python # -*- coding:utf-8 -*- import json from threading import Lock import logging import datetime from os import path import time class Handle: def __init__(self, max_length, temp_file, official_dir, interval=5 * 60): """ :param max_length: Every 1000 rows of data are deposited in an official file. :param temp_file: Temporary file path. :param official_dir: Official folder Path. :param interval: Put temporary files into official files to upload at regular intervals. """ self.lock = Lock() # Data results collected by multiple plug-ins need to be queued to write to files. # The Q is the queue. self.Q = [] self.max_item_length = max_length self.temp_file = temp_file self.official_dir = official_dir self.interval = interval self.current_row = 0 def add_task(self, content): if not isinstance(content, dict): logging.warning('[HANDLE] add task content: %s but type is not dict', content) return self.lock.acquire() content = self.content_format(content) self.Q.append(content) self.lock.release() @staticmethod def content_format(content): if not content.__contains__('dev_ip'): content['dev_ip'] = '' if not content.__contains__('expr'): content['expr'] = 1 if not content.__contains__('dev_id'): content['dev_id'] = '' if not content.__contains__('port'): content['port'] = '' if not content.__contains__('type'): content['type'] = '' if not content.__contains__('schema'): content['schema'] = '' if not content.__contains__('login_name'): content['login_name'] = '' if not content.__contains__('pwd'): content['pwd'] = '' if not content.__contains__('kpi'): content['kpi'] = {} return content def to_file(self, name): # file_name = path.join(self.official_dir, name) # f = open(file_name, encoding='utf-8') # # copy temp file to official file. return 'error' def listen(self): temp_file_mode = 'a+' temp_file_ptr = self.create_temp_file(temp_file_mode) while True: self.sleep() self.lock.acquire() q = self.Q self.lock.release() if len(q) < 1: continue # Open temp file ptr. It might get None again, so execute continue one more time. if temp_file_ptr is None or temp_file_ptr.closed: temp_file_ptr = self.create_temp_file(temp_file_mode) continue for item in q: item_value = '' try: item_value = json.dumps(item) temp_file_ptr.write(item_value) self.current_row += 1 except Exception as e: logging.error("[HANDLE] write item(%s) error :%s", item_value, e) # If the currently recorded data line is greater than or equal to the largest data line, # the temporary file is written to the official file. if self.current_row >= self.max_item_length: try: now = int(datetime.datetime.now().timestamp() * 1000) file_name = str(now) + '.tok' msg = self.to_file(file_name) if msg is not None: logging.error('[HANDLE] to official file(%s) error: %s', file_name, msg) continue temp_file_ptr.close() temp_file_mode = 'w+' except Exception as e: print(e) def create_temp_file(self, mode): try: f = open(self.temp_file, mode=mode) return f except Exception as e: logging.error("[HANDLE] create temp file(%s) error: %s", self.temp_file, e) return None @staticmethod def sleep(): time.sleep(1) def ftp_upload(self): pass