|
@@ -4,7 +4,7 @@ import json
|
|
|
from threading import Lock
|
|
|
import logging
|
|
|
import datetime
|
|
|
-from os import path
|
|
|
+import os
|
|
|
import time
|
|
|
import shutil
|
|
|
import paramiko
|
|
@@ -14,11 +14,18 @@ default_max_length = 1000
|
|
|
|
|
|
|
|
|
class Handle:
|
|
|
- def __init__(self, max_length, temp_file, official_dir, interval=5 * 60):
|
|
|
+ def __init__(self, max_length, temp_file, official_dir, rubbish,
|
|
|
+ host, port, user, password, remote_dir, interval=5 * 60):
|
|
|
"""
|
|
|
:param max_length: Every 1000 rows of data are deposited in an official file.
|
|
|
:param temp_file: Temporary file path.
|
|
|
:param official_dir: Official folder Path.
|
|
|
+ :param rubbish: A temporary file recycle bin that periodically empties the folder.
|
|
|
+ :param host: Remote server address for uploading files.
|
|
|
+ :param port: Remote server port for uploading files.
|
|
|
+ :param user: Remote server username for uploading files.
|
|
|
+ :param password: Remote server password for uploading files.
|
|
|
+ :param remote_dir: Remote server folder for uploading files.
|
|
|
:param interval: Put temporary files into official files to upload at regular intervals.
|
|
|
"""
|
|
|
self.lock = Lock()
|
|
@@ -31,6 +38,13 @@ class Handle:
|
|
|
self.official_dir = official_dir
|
|
|
self.interval = interval
|
|
|
self.current_row = 0
|
|
|
+ self.rubbish = rubbish
|
|
|
+
|
|
|
+ self.host = host
|
|
|
+ self.port = port
|
|
|
+ self.user = user
|
|
|
+ self.password = password
|
|
|
+ self.remote_dir = remote_dir
|
|
|
|
|
|
def add_task(self, content):
|
|
|
if not isinstance(content, dict):
|
|
@@ -69,6 +83,18 @@ class Handle:
|
|
|
def to_file(self, name):
|
|
|
try:
|
|
|
shutil.copy(self.temp_file, name)
|
|
|
+ uploader = Uploader(self.host, self.port, self.user, self.password, self.official_dir, self.remote_dir)
|
|
|
+ err = uploader.upload()
|
|
|
+ if err is not None:
|
|
|
+ logging.error('[HANDLE] upload file with '
|
|
|
+ 'host: %s, port: %d, user: %s, password: %s, local: %s, error: %s',
|
|
|
+ self.host, self.port, self.user, self.password, name, err)
|
|
|
+ return err
|
|
|
+
|
|
|
+ for f in os.listdir(self.official_dir):
|
|
|
+ if os.path.isdir(f):
|
|
|
+ continue
|
|
|
+ shutil.move(name, self.rubbish)
|
|
|
except Exception as e:
|
|
|
return e
|
|
|
return None
|
|
@@ -114,7 +140,7 @@ class Handle:
|
|
|
# logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg)
|
|
|
try:
|
|
|
now = int(datetime.datetime.now().timestamp() * 1000)
|
|
|
- official_file = path.join(self.official_dir, str(now) + '.tok')
|
|
|
+ official_file = os.path.join(self.official_dir, str(now) + '.tok')
|
|
|
temp_file_ptr.close()
|
|
|
msg = self.to_file(official_file)
|
|
|
if msg is not None:
|
|
@@ -149,7 +175,7 @@ class Handle:
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
-defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor')
|
|
|
+defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor', 'rubbish', '', '', '', '', '', )
|
|
|
|
|
|
|
|
|
def add_task(content):
|
|
@@ -170,6 +196,10 @@ class Uploader:
|
|
|
self.remote = remote
|
|
|
|
|
|
def upload(self):
|
|
|
+ """
|
|
|
+ Upload files or folders to remote servers via SCP
|
|
|
+ :return:
|
|
|
+ """
|
|
|
try:
|
|
|
ssh_client = paramiko.SSHClient()
|
|
|
ssh_client.load_system_host_keys()
|
|
@@ -178,6 +208,7 @@ class Uploader:
|
|
|
|
|
|
scp_client = scp.SCPClient(ssh_client.get_transport())
|
|
|
scp_client.put(self.local, self.remote, recursive=True)
|
|
|
+ ssh_client.close()
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
logging.error('[UPLOAD] upload files with '
|