Browse Source

add upload test and fix bug

tangs 6 years ago
parent
commit
ba75c73331
2 changed files with 67 additions and 3 deletions
  1. 25 3
      upload.py
  2. 42 0
      upload_test.py

+ 25 - 3
upload.py

@@ -8,6 +8,8 @@ from os import path
 import time
 import shutil
 
+default_max_length = 1000
+
 
 class Handle:
     def __init__(self, max_length, temp_file, official_dir, interval=5 * 60):
@@ -72,11 +74,13 @@ class Handle:
         temp_file_ptr = self.create_temp_file(temp_file_mode)
         beg = int(datetime.datetime.now().timestamp())
         end = beg
+        q = []
         while True:
             self.sleep()
             end = int(datetime.datetime.now().timestamp())
             self.lock.acquire()
             q = self.Q
+            self.Q = []
             self.lock.release()
             if len(q) < 1:
                 continue
@@ -84,13 +88,13 @@ class Handle:
             # Open temp file ptr. It might get None again, so execute continue one more time.
             if temp_file_ptr is None or temp_file_ptr.closed:
                 temp_file_ptr = self.create_temp_file(temp_file_mode)
-                continue
+                # continue
 
             for item in q:
                 item_value = ''
                 try:
                     item_value = json.dumps(item)
-                    temp_file_ptr.write(item_value)
+                    temp_file_ptr.write(item_value + '\n')
                     self.current_row += 1
                 except Exception as e:
                     logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
@@ -103,16 +107,17 @@ class Handle:
             # not empty, an official file is also saved.
             official_file = ''
             if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0):
+                # logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg)
                 try:
                     now = int(datetime.datetime.now().timestamp() * 1000)
                     official_file = path.join(self.official_dir, str(now) + '.tok')
+                    temp_file_ptr.close()
                     msg = self.to_file(official_file)
                     if msg is not None:
                         logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg)
                         temp_file_mode = 'a+'
                         continue
 
-                    temp_file_ptr.close()
                     temp_file_mode = 'w+'
                     self.current_row = 0
 
@@ -121,9 +126,15 @@ class Handle:
                     logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s',
                                   self.temp_file, official_file, e)
 
+    def stop(self):
+        pass
+
     def create_temp_file(self, mode):
         try:
             f = open(self.temp_file, mode=mode)
+            if mode == 'w+':
+                f.seek(0)
+                f.truncate()
             return f
         except Exception as e:
             logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e)
@@ -134,6 +145,17 @@ class Handle:
         time.sleep(1)
 
 
+defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor')
+
+
+def add_task(content):
+    defaultHandler.add_task(content)
+
+
+def listen():
+    defaultHandler.listen()
+
+
 class Uploader:
     def __init__(self):
         pass

+ 42 - 0
upload_test.py

@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+
+
+import unittest
+import upload
+import threading
+import time
+
+
+class UploadTest(unittest.TestCase):
+    def generate_content(self):
+        return {
+            'dev_ip': 'local.pc',
+            'port': ':9011',
+            'type': 'camera',
+            'pwd': time.time()
+        }
+
+    def test_generate_file(self):
+        print("-=-=-=-=")
+        upload.defaultHandler = upload.Handle(1, 'temp.txt', './')
+
+        t = threading.Thread(target=upload.listen)
+        t.setDaemon(True)
+        t.start()
+
+        # add task
+        upload.add_task(self.generate_content())
+        time.sleep(4)
+
+        upload.add_task(self.generate_content())
+        upload.add_task(self.generate_content())
+        upload.add_task(self.generate_content())
+        upload.add_task(self.generate_content())
+        upload.add_task(self.generate_content())
+        upload.add_task(self.generate_content())
+        time.sleep(4)
+
+
+if __name__ == '__main__':
+    unittest.main()