浏览代码

implement func called listen

tangs 6 年之前
父节点
当前提交
97ed9381bc
共有 1 个文件被更改,包括 77 次插入7 次删除
  1. 77 7
      upload.py

+ 77 - 7
upload.py

@@ -1,15 +1,31 @@
 #!/usr/bin/env python
 # -*- coding:utf-8 -*-
-
+import json
 from threading import Lock
 import logging
+import datetime
+from os import path
+import time
 
 
 class Handle:
-    def __init__(self):
+    def __init__(self, max_length, temp_file, official_dir, interval=5 * 60):
+        """
+        :param max_length: Every 1000 rows of data are deposited in an official file.
+        :param temp_file: Temporary file path.
+        :param official_dir: Official folder Path.
+        :param interval: Put temporary files into official files to upload at regular intervals.
+        """
         self.lock = Lock()
+
+        # Data results collected by multiple plug-ins need to be queued to write to files.
+        # The Q is the queue.
         self.Q = []
-        self.max_item_length = 1000
+        self.max_item_length = max_length
+        self.temp_file = temp_file
+        self.official_dir = official_dir
+        self.interval = interval
+        self.current_row = 0
 
     def add_task(self, content):
         if not isinstance(content, dict):
@@ -43,11 +59,65 @@ class Handle:
             content['kpi'] = {}
         return content
 
-    def to_file(self):
-        pass
+    def to_file(self, name):
+        # file_name = path.join(self.official_dir, name)
+        # f = open(file_name, encoding='utf-8')
+        #
+        # copy temp file to official file.
+        return 'error'
 
-    def to_temp_file(self):
-        pass
+    def listen(self):
+        temp_file_mode = 'a+'
+        temp_file_ptr = self.create_temp_file(temp_file_mode)
+        while True:
+            self.sleep()
+            self.lock.acquire()
+            q = self.Q
+            self.lock.release()
+            if len(q) < 1:
+                continue
+
+            # Open temp file ptr. It might get None again, so execute continue one more time.
+            if temp_file_ptr is None or temp_file_ptr.closed:
+                temp_file_ptr = self.create_temp_file(temp_file_mode)
+                continue
+
+            for item in q:
+                item_value = ''
+                try:
+                    item_value = json.dumps(item)
+                    temp_file_ptr.write(item_value)
+                    self.current_row += 1
+                except Exception as e:
+                    logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
+
+            # If the currently recorded data line is greater than or equal to the largest data line,
+            # the temporary file is written to the official file.
+            if self.current_row >= self.max_item_length:
+                try:
+                    now = int(datetime.datetime.now().timestamp() * 1000)
+                    file_name = str(now) + '.tok'
+                    msg = self.to_file(file_name)
+                    if msg is not None:
+                        logging.error('[HANDLE] to official file(%s) error: %s', file_name, msg)
+                        continue
+
+                    temp_file_ptr.close()
+                    temp_file_mode = 'w+'
+                except Exception as e:
+                    print(e)
+
+    def create_temp_file(self, mode):
+        try:
+            f = open(self.temp_file, mode=mode)
+            return f
+        except Exception as e:
+            logging.error("[HANDLE] create temp file(%s) error: %s", self.temp_file, e)
+            return None
+
+    @staticmethod
+    def sleep():
+        time.sleep(1)
 
     def ftp_upload(self):
         pass