upload_test.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import unittest
  4. import upload
  5. import threading
  6. import time
  7. class UploadFileCheckTest(unittest.TestCase):
  8. def generate_content(self):
  9. return {
  10. 'dev_ip': 'local.pc',
  11. 'port': ':9011',
  12. 'type': 'camera',
  13. 'schema': 'onvif',
  14. 'kpi': {
  15. 'img_light': 100,
  16. 'img_cast': 100,
  17. 'img_rgcast': 100,
  18. 'img_lightcast': 100,
  19. 'img_clear': 100,
  20. 'img_ybcast': 100,
  21. }
  22. }
  23. def test_generate_file(self):
  24. print("-=-=-=-=")
  25. upload.defaultHandler = upload.Handle(1, 'temp.txt', './')
  26. t = threading.Thread(target=upload.listen)
  27. t.setDaemon(True)
  28. t.start()
  29. # add task
  30. upload.add_task(self.generate_content())
  31. time.sleep(4)
  32. upload.add_task(self.generate_content())
  33. upload.add_task(self.generate_content())
  34. upload.add_task(self.generate_content())
  35. upload.add_task(self.generate_content())
  36. upload.add_task(self.generate_content())
  37. upload.add_task(self.generate_content())
  38. time.sleep(4)
  39. class UploaderTest(unittest.TestCase):
  40. def test_upload(self):
  41. host = 'local.pc'
  42. port = 22
  43. username = 'tangs'
  44. password = 'local12456'
  45. # upload file
  46. uploader1 = upload.Uploader(host, port, username, password, 'test/scp_file_1.txt', 'test/scp_file_1.txt')
  47. e = uploader1.upload()
  48. self.assertIsNone(e)
  49. # upload file
  50. uploader2 = upload.Uploader(host, port, username, password, 'test/scp_file_2.txt', 'test/scp_file_2.txt')
  51. e = uploader2.upload()
  52. self.assertIsNone(e)
  53. # upload folder
  54. uploader3 = upload.Uploader(host, port, username, password, 'test/scp_folder', 'test/scp_folder')
  55. e = uploader3.upload()
  56. self.assertIsNone(e)
  57. if __name__ == '__main__':
  58. unittest.main()