Browse Source

rewrite plugin ipc

tangs 6 years ago
parent
commit
99e413b4e3
5 changed files with 173 additions and 24 deletions
  1. 20 0
      plugins/README.md
  2. 3 0
      plugins/base.py
  3. 128 23
      plugins/ipc.py
  4. 22 1
      plugins/ipc_test.py
  5. BIN
      test/image/ipc_analyze_img.jpg

+ 20 - 0
plugins/README.md

@@ -0,0 +1,20 @@
+## Parameter description of each plugin
+
+### IPC
+host: device ip address
+
+port: device port
+
+dev_id: unique device id
+
+type: device type, must be "IPC"
+
+schema: communication protocol, must be "onvif"
+
+username: login name
+
+pwd: login password
+
+expr: ---
+
+kpi: need to analyze the uploaded kpi

+ 3 - 0
plugins/base.py

@@ -7,3 +7,6 @@ class Base:
 
     def exec(self, *args, **kwargs):
         pass
+
+    def config_check(self, *args, **kwargs):
+        pass

+ 128 - 23
plugins/ipc.py

@@ -1,49 +1,154 @@
 #!/usr/bin/env python
 # -*- coding:utf-8 -*-
+import logging
+import math
+
+import cv2
+from PIL import Image, ImageStat
 
 from plugins import base
 from onvif import ONVIFCamera
-from urllib.request import urlretrieve
-from os import path
 import datetime
+import zeep
+import uuid
+import upload
+
+
+def zeep_pythonvalue(self, xmlvalue):
+    return xmlvalue
+
+
+zeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue
 
 
 class IPC(base.Base):
     def __init__(self, *args, **kwargs):
-        base.Base.__init__(*args, **kwargs)
+        super(IPC, self).__init__(*args, **kwargs)
+
+        if kwargs.__contains__('name'):
+            self.name = kwargs['name']
+        else:
+            self.name = 'ipc' + uuid.uuid1().hex[0:10]
+
         self.host = kwargs['host']
         self.port = kwargs['port']
         self.user = kwargs['user']
         self.password = kwargs['password']
         self.image_dir = kwargs['image_dir']
-        self.wsdl_dir = '../resource/wsdl'
+        self.wsdl_dir = 'resource/wsdl'
         if kwargs.__contains__('wsdl_dir') and kwargs['wsdl_dir'] != "":
             self.wsdl_dir = kwargs['wsdl_dir']
+        self.kpi = kwargs['kpi']
+
+    def name(self):
+        return self.name
+
+    def config_check(self, *args, **kwargs):
+        pass
 
     def exec(self):
         image_name = self.snapshot()
-
         analyze_result = self.analyze()
-        return {'data': analyze_result, 'image': image_name}
+        data = {'data': analyze_result, 'image': image_name}
+        # upload.add_task(data)
+        return
 
     def snapshot(self):
+        camera = ONVIFCamera(self.host, self.port, self.user, self.password, self.wsdl_dir)
+        media = camera.create_media_service()
+        media_profile = media.GetProfiles()[0]
+        profiles = media.GetProfiles()
+        if len(profiles) < 1:
+            return
+
+        token = profiles[0]['token']
+        snapshot_detail = media.GetSnapshotUri(token)
+        snapshot_url = snapshot_detail['url']
+        image_name = str(int(datetime.datetime.now().timestamp() * 1000)) + ".png"
+
+        # download image via image url
+        # urlretrieve(snapshot_url, path.join(self.image_dir, image_name))
+        return image_name
+
+    def analyze(self, path):
+        if len(self.kpi) < 1:
+            return
+        kpi_result = {}
+        if self.kpi.__contains__('img_cast'):
+            kpi_result['img_cast'] = self.get_img_cast(path)
+        if self.kpi.__contains__('img_ybcast'):
+            kpi_result['img_ybcast'] = self.get_img_ybcast(path)
+        if self.kpi.__contains__('img_light'):
+            kpi_result['img_light'] = self.get_img_light(path)
+        if self.kpi.__contains__('img_rgcast'):
+            kpi_result['img_rgcast'] = self.get_img_rgcast(path)
+        if self.kpi.__contains__('img_lightcast'):
+            kpi_result['img_lightcast'] = self.get_img_lightcast(path)
+        if self.kpi.__contains__('img_clear'):
+            kpi_result['img_clear'] = self.get_img_clear(path)
+        if self.kpi.__contains__('img_quality'):
+            kpi_result['img_quality'] = self.get_img_quality(path)
+        return kpi_result
+
+    # https://blog.csdn.net/sparrowwf/article/details/86595162
+    @staticmethod
+    def get_img_cast(path):
+        image = cv2.imread(path)
+        img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
+        l_channel, a_channel, b_channel = cv2.split(img)
+        h, w, _ = img.shape
+        da = a_channel.sum() / (h * w) - 128
+        db = b_channel.sum() / (h * w) - 128
+        histA = [0] * 256
+        histB = [0] * 256
+        for i in range(h):
+            for j in range(w):
+                ta = a_channel[i][j]
+                tb = b_channel[i][j]
+                histA[ta] += 1
+                histB[tb] += 1
+        msqA = 0
+        msqB = 0
+        for y in range(256):
+            msqA += float(abs(y - 128 - da)) * histA[y] / (w * h)
+            msqB += float(abs(y - 128 - db)) * histB[y] / (w * h)
+        import math
+
+        result = math.sqrt(da * da + db * db) / math.sqrt(msqA * msqA + msqB * msqB)
+        return result
+
+    @staticmethod
+    def get_img_ybcast(path):
+        return 1
+
+    # https://stackoverflow.com/questions/3490727/what-are-some-methods-to-analyze-image-brightness-using-python
+    @staticmethod
+    def get_img_light(path):
         try:
-            mycam = ONVIFCamera(self.host, self.port, self.user, self.password, self.wsdl_dir)
-            media = mycam.create_media_service()
-            profiles = media.GetProfiles()
-            if len(profiles) < 1:
-                return
-
-            token = profiles[0]['token']
-            snapshot_detail = media.GetSnapshotUri(token)
-            snapshot_url = snapshot_detail['url']
-            image_name = str(int(datetime.datetime.now().timestamp() * 1000)) + ".png"
-
-            # download image via image url
-            urlretrieve(snapshot_url, path.join(self.image_dir, image_name))
+            im = Image.open(path)
+            stat = ImageStat.Stat(im)
+            r, g, b = stat.mean
+            return math.sqrt(0.241 * (r ** 2) + 0.691 * (g ** 2) + 0.068 * (b ** 2))
         except Exception as e:
-            return e
-        return image_name
+            logging.error('[IPC] calculate image light with file: %s error: %s', path, e)
+        return 0
+
+    @staticmethod
+    def get_img_rgcast(path):
+        return 1
+
+    @staticmethod
+    def get_img_lightcast(path):
+        return 1
+
+    # https://blog.csdn.net/luolinll1212/article/details/84107066
+    @staticmethod
+    def get_img_clear(path):
+        image = cv2.imread(path)
+        img2gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
+        result = cv2.Laplacian(img2gray, cv2.CV_64F).var()
+        return result
 
-    def analyze(self):
-        return {}
+    @staticmethod
+    def get_img_quality(path):
+        return 1

+ 22 - 1
plugins/ipc_test.py

@@ -1,2 +1,23 @@
 #!/usr/bin/env python
-# -*- coding:utf-8 -*-
+# -*- coding:utf-8 -*-
+
+import unittest
+from plugins import ipc
+
+
+class IPCTest(unittest.TestCase):
+    def test_snapshot(self):
+        host = 'ipc.pc'
+        port = 2000
+        user = 'admin'
+        password = 'admin'
+        image_dir = '../test/image'
+        wsdl_dir = '../resource/wsdl'
+        camera = ipc.IPC(host=host, port=port, user=user, password=password,
+                         image_dir=image_dir, wsdl_dir=wsdl_dir)
+        snapshot = camera.snapshot()
+        print(snapshot)
+
+
+if __name__ == '__main__':
+    unittest.main()

BIN
test/image/ipc_analyze_img.jpg