upload.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import json
  4. from threading import Lock
  5. import logging
  6. import datetime
  7. from os import path
  8. import time
  9. class Handle:
  10. def __init__(self, max_length, temp_file, official_dir, interval=5 * 60):
  11. """
  12. :param max_length: Every 1000 rows of data are deposited in an official file.
  13. :param temp_file: Temporary file path.
  14. :param official_dir: Official folder Path.
  15. :param interval: Put temporary files into official files to upload at regular intervals.
  16. """
  17. self.lock = Lock()
  18. # Data results collected by multiple plug-ins need to be queued to write to files.
  19. # The Q is the queue.
  20. self.Q = []
  21. self.max_item_length = max_length
  22. self.temp_file = temp_file
  23. self.official_dir = official_dir
  24. self.interval = interval
  25. self.current_row = 0
  26. def add_task(self, content):
  27. if not isinstance(content, dict):
  28. logging.warning('[HANDLE] add task content: %s but type is not dict', content)
  29. return
  30. self.lock.acquire()
  31. content = self.content_format(content)
  32. self.Q.append(content)
  33. self.lock.release()
  34. @staticmethod
  35. def content_format(content):
  36. if not content.__contains__('dev_ip'):
  37. content['dev_ip'] = ''
  38. if not content.__contains__('expr'):
  39. content['expr'] = 1
  40. if not content.__contains__('dev_id'):
  41. content['dev_id'] = ''
  42. if not content.__contains__('port'):
  43. content['port'] = ''
  44. if not content.__contains__('type'):
  45. content['type'] = ''
  46. if not content.__contains__('schema'):
  47. content['schema'] = ''
  48. if not content.__contains__('login_name'):
  49. content['login_name'] = ''
  50. if not content.__contains__('pwd'):
  51. content['pwd'] = ''
  52. if not content.__contains__('kpi'):
  53. content['kpi'] = {}
  54. return content
  55. def to_file(self, name):
  56. # file_name = path.join(self.official_dir, name)
  57. # f = open(file_name, encoding='utf-8')
  58. #
  59. # copy temp file to official file.
  60. return 'error'
  61. def listen(self):
  62. temp_file_mode = 'a+'
  63. temp_file_ptr = self.create_temp_file(temp_file_mode)
  64. while True:
  65. self.sleep()
  66. self.lock.acquire()
  67. q = self.Q
  68. self.lock.release()
  69. if len(q) < 1:
  70. continue
  71. # Open temp file ptr. It might get None again, so execute continue one more time.
  72. if temp_file_ptr is None or temp_file_ptr.closed:
  73. temp_file_ptr = self.create_temp_file(temp_file_mode)
  74. continue
  75. for item in q:
  76. item_value = ''
  77. try:
  78. item_value = json.dumps(item)
  79. temp_file_ptr.write(item_value)
  80. self.current_row += 1
  81. except Exception as e:
  82. logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
  83. # If the currently recorded data line is greater than or equal to the largest data line,
  84. # the temporary file is written to the official file.
  85. if self.current_row >= self.max_item_length:
  86. try:
  87. now = int(datetime.datetime.now().timestamp() * 1000)
  88. file_name = str(now) + '.tok'
  89. msg = self.to_file(file_name)
  90. if msg is not None:
  91. logging.error('[HANDLE] to official file(%s) error: %s', file_name, msg)
  92. continue
  93. temp_file_ptr.close()
  94. temp_file_mode = 'w+'
  95. except Exception as e:
  96. print(e)
  97. def create_temp_file(self, mode):
  98. try:
  99. f = open(self.temp_file, mode=mode)
  100. return f
  101. except Exception as e:
  102. logging.error("[HANDLE] create temp file(%s) error: %s", self.temp_file, e)
  103. return None
  104. @staticmethod
  105. def sleep():
  106. time.sleep(1)
  107. def ftp_upload(self):
  108. pass