upload.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import json
  4. from threading import Lock
  5. import logging
  6. import datetime
  7. from os import path
  8. import time
  9. import shutil
  10. default_max_length = 1000
  11. class Handle:
  12. def __init__(self, max_length, temp_file, official_dir, interval=5 * 60):
  13. """
  14. :param max_length: Every 1000 rows of data are deposited in an official file.
  15. :param temp_file: Temporary file path.
  16. :param official_dir: Official folder Path.
  17. :param interval: Put temporary files into official files to upload at regular intervals.
  18. """
  19. self.lock = Lock()
  20. # Data results collected by multiple plug-ins need to be queued to write to files.
  21. # The Q is the queue.
  22. self.Q = []
  23. self.max_item_length = max_length
  24. self.temp_file = temp_file
  25. self.official_dir = official_dir
  26. self.interval = interval
  27. self.current_row = 0
  28. def add_task(self, content):
  29. if not isinstance(content, dict):
  30. logging.warning('[HANDLE] add task content: %s but type is not dict', content)
  31. return
  32. self.lock.acquire()
  33. content = self.content_format(content)
  34. self.Q.append(content)
  35. self.lock.release()
  36. @staticmethod
  37. def content_format(content):
  38. if not content.__contains__('dev_ip'):
  39. content['dev_ip'] = ''
  40. if not content.__contains__('expr'):
  41. content['expr'] = 1
  42. if not content.__contains__('dev_id'):
  43. content['dev_id'] = ''
  44. if not content.__contains__('port'):
  45. content['port'] = ''
  46. if not content.__contains__('type'):
  47. content['type'] = ''
  48. if not content.__contains__('schema'):
  49. content['schema'] = ''
  50. if not content.__contains__('timestamp'):
  51. content['timestamp'] = int(datetime.datetime.now().timestamp() * 1000)
  52. if not content.__contains__('login_name'):
  53. content['login_name'] = ''
  54. # if not content.__contains__('pwd'):
  55. # content['pwd'] = ''
  56. if not content.__contains__('kpi'):
  57. content['kpi'] = {}
  58. return content
  59. def to_file(self, name):
  60. try:
  61. shutil.copy(self.temp_file, name)
  62. except Exception as e:
  63. return e
  64. return None
  65. def listen(self):
  66. temp_file_mode = 'a+'
  67. temp_file_ptr = self.create_temp_file(temp_file_mode)
  68. beg = int(datetime.datetime.now().timestamp())
  69. end = beg
  70. q = []
  71. while True:
  72. self.sleep()
  73. end = int(datetime.datetime.now().timestamp())
  74. self.lock.acquire()
  75. q = self.Q
  76. self.Q = []
  77. self.lock.release()
  78. if len(q) < 1:
  79. continue
  80. # Open temp file ptr. It might get None again, so execute continue one more time.
  81. if temp_file_ptr is None or temp_file_ptr.closed:
  82. temp_file_ptr = self.create_temp_file(temp_file_mode)
  83. # continue
  84. for item in q:
  85. item_value = ''
  86. try:
  87. item_value = json.dumps(item)
  88. temp_file_ptr.write(item_value + '\n')
  89. self.current_row += 1
  90. except Exception as e:
  91. logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
  92. logging.debug('[HANDLE] listen and ')
  93. # If the currently recorded data line is greater than or equal to the largest data line,
  94. # the temporary file is written to the official file.
  95. # If the file has not been saved for more than three minutes, and the temporary file is
  96. # not empty, an official file is also saved.
  97. official_file = ''
  98. if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0):
  99. # logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg)
  100. try:
  101. now = int(datetime.datetime.now().timestamp() * 1000)
  102. official_file = path.join(self.official_dir, str(now) + '.tok')
  103. temp_file_ptr.close()
  104. msg = self.to_file(official_file)
  105. if msg is not None:
  106. logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg)
  107. temp_file_mode = 'a+'
  108. continue
  109. temp_file_mode = 'w+'
  110. self.current_row = 0
  111. beg = int(datetime.datetime.now().timestamp())
  112. except Exception as e:
  113. logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s',
  114. self.temp_file, official_file, e)
  115. def stop(self):
  116. pass
  117. def create_temp_file(self, mode):
  118. try:
  119. f = open(self.temp_file, mode=mode)
  120. if mode == 'w+':
  121. f.seek(0)
  122. f.truncate()
  123. return f
  124. except Exception as e:
  125. logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e)
  126. return None
  127. @staticmethod
  128. def sleep():
  129. time.sleep(1)
  130. defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor')
  131. def add_task(content):
  132. defaultHandler.add_task(content)
  133. def listen():
  134. defaultHandler.listen()
  135. class Uploader:
  136. def __init__(self):
  137. pass