#!/usr/bin/env python # -*- coding:utf-8 -*- import logging import math import cv2 from PIL import Image, ImageStat from plugins import base from onvif import ONVIFCamera import datetime import zeep import uuid import upload def zeep_pythonvalue(self, xmlvalue): return xmlvalue zeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue class IPC(base.Base): def __init__(self, *args, **kwargs): super(IPC, self).__init__(*args, **kwargs) if kwargs.__contains__('name'): self.name = kwargs['name'] else: self.name = 'ipc' + uuid.uuid1().hex[0:10] self.host = kwargs['host'] self.port = kwargs['port'] self.user = kwargs['user'] self.password = kwargs['password'] self.image_dir = kwargs['image_dir'] self.wsdl_dir = 'resource/wsdl' if kwargs.__contains__('wsdl_dir') and kwargs['wsdl_dir'] != "": self.wsdl_dir = kwargs['wsdl_dir'] self.kpi = kwargs['kpi'] def name(self): return self.name def config_check(self, *args, **kwargs): pass def exec(self): image_name = self.snapshot() analyze_result = self.analyze() data = {'data': analyze_result, 'image': image_name} # upload.add_task(data) return def snapshot(self): camera = ONVIFCamera(self.host, self.port, self.user, self.password, self.wsdl_dir) media = camera.create_media_service() media_profile = media.GetProfiles()[0] profiles = media.GetProfiles() if len(profiles) < 1: return token = profiles[0]['token'] snapshot_detail = media.GetSnapshotUri(token) snapshot_url = snapshot_detail['url'] image_name = str(int(datetime.datetime.now().timestamp() * 1000)) + ".png" # download image via image url # urlretrieve(snapshot_url, path.join(self.image_dir, image_name)) return image_name def analyze(self, path): if len(self.kpi) < 1: return kpi_result = {} if self.kpi.__contains__('img_cast'): kpi_result['img_cast'] = self.get_img_cast(path) if self.kpi.__contains__('img_ybcast'): kpi_result['img_ybcast'] = self.get_img_ybcast(path) if self.kpi.__contains__('img_light'): kpi_result['img_light'] = self.get_img_light(path) if self.kpi.__contains__('img_rgcast'): kpi_result['img_rgcast'] = self.get_img_rgcast(path) if self.kpi.__contains__('img_lightcast'): kpi_result['img_lightcast'] = self.get_img_lightcast(path) if self.kpi.__contains__('img_clear'): kpi_result['img_clear'] = self.get_img_clear(path) if self.kpi.__contains__('img_quality'): kpi_result['img_quality'] = self.get_img_quality(path) return kpi_result # https://blog.csdn.net/sparrowwf/article/details/86595162 @staticmethod def get_img_cast(path): image = cv2.imread(path) img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) l_channel, a_channel, b_channel = cv2.split(img) h, w, _ = img.shape da = a_channel.sum() / (h * w) - 128 db = b_channel.sum() / (h * w) - 128 histA = [0] * 256 histB = [0] * 256 for i in range(h): for j in range(w): ta = a_channel[i][j] tb = b_channel[i][j] histA[ta] += 1 histB[tb] += 1 msqA = 0 msqB = 0 for y in range(256): msqA += float(abs(y - 128 - da)) * histA[y] / (w * h) msqB += float(abs(y - 128 - db)) * histB[y] / (w * h) import math result = math.sqrt(da * da + db * db) / math.sqrt(msqA * msqA + msqB * msqB) return result @staticmethod def get_img_ybcast(path): return 1 # https://stackoverflow.com/questions/3490727/what-are-some-methods-to-analyze-image-brightness-using-python @staticmethod def get_img_light(path): try: im = Image.open(path) stat = ImageStat.Stat(im) r, g, b = stat.mean return math.sqrt(0.241 * (r ** 2) + 0.691 * (g ** 2) + 0.068 * (b ** 2)) except Exception as e: logging.error('[IPC] calculate image light with file: %s error: %s', path, e) return 0 @staticmethod def get_img_rgcast(path): return 1 @staticmethod def get_img_lightcast(path): return 1 # https://blog.csdn.net/luolinll1212/article/details/84107066 @staticmethod def get_img_clear(path): image = cv2.imread(path) img2gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) result = cv2.Laplacian(img2gray, cv2.CV_64F).var() return result @staticmethod def get_img_quality(path): return 1