package service
import (
"context"
"errors"
"fmt"
moniMdl "go-common/app/job/main/aegis/model/monitor"
accApi "go-common/app/service/main/account/api"
upApi "go-common/app/service/main/up/api/v1"
"go-common/library/ecode"
"go-common/library/log"
"go-common/library/xstr"
"math"
"reflect"
"strconv"
"strings"
"time"
)
// monitorArchive 稿件业务监控
// 注意:oa有可能是nil,使用前必须判断!!!
func (s *Service) monitorArchive(act string, oa, na *moniMdl.BinlogArchive) (errs []error) {
var (
c = context.TODO()
logs []string
err error
errs2 []error
nAddit *moniMdl.ArchiveAddit
)
defer func() {
logStr := strings.Join(logs, "\n")
if x := recover(); x != nil {
log.Error("s.monitorArchive() unknown panic(%v)", x)
} else if len(errs) > 0 {
log.Error("s.monitorArchive(\n act: %s \n oa: %+v \n na: %+v) \n logStr:\n %v \n error:%v", act, oa, na, logStr, errs)
} else {
log.Info("s.monitorArchive(\n act: %s \n oa: %+v \n na: %+v) \n logStr:\n %v", act, oa, na, logStr)
}
}()
if (na.Attr>>moniMdl.ArchiveBitPGC)&int64(1) == 1 {
logs = append(logs, "忽略PGC稿件")
return
}
if na == nil {
err = errors.New("new msg nil")
errs = append(errs, err)
logs = append(logs, "databus数据异常,new msg nil")
return
}
na.IsSpecTID = moniMdl.SpecialTypeIDs[na.TypeID]
if nAddit, err = s.moniDao.ArchiveAttr(c, na.ID); err != nil {
logs = append(logs, fmt.Sprintf("warn:稿件Addit获取失败!aid:%d error:%v", na.ID, err))
if err != ecode.NothingFound {
errs = append(errs, err)
return
}
err = nil
} else {
na.Addit = nAddit
}
errs2 = s.monitorHandle(moniMdl.BusArc, *na, na.ID)
errs = append(errs, errs2...)
return
}
// monitorUpDelArc 监控UP主删除稿件。监控指定的UP主。1、在高能联盟的up主特殊用户组;2、粉丝数超过50w
func (s *Service) monitorUpDelArc(id int64, obj interface{}) (satisfy bool, logs []string, err error) {
var (
c = context.TODO()
pReply *accApi.ProfileStatReply
upReply *upApi.HighAllyUpsReply
a *moniMdl.BinlogArchive
)
logs = append(logs, "s.monitorUpDelArc() begin")
if obj == nil {
logs = append(logs, "\t obj是nil")
err = errors.New("obj is nil")
return
}
switch obj.(type) {
case *moniMdl.BinlogArchive:
a = obj.(*moniMdl.BinlogArchive)
case moniMdl.BinlogArchive:
ac := obj.(moniMdl.BinlogArchive)
a = &ac
default:
logs = append(logs, fmt.Sprintf("\t 未知类型:%+v", obj))
err = errors.New("unknown interface type")
return
}
logs = append(logs, fmt.Sprintf("\t archive:%+v", a))
if a == nil {
err = errors.New("archive is nil")
return
}
if a.State != moniMdl.ArchiveStateDel {
logs = append(logs, "\t 非删除,忽略")
return
}
if a.Copyright != moniMdl.ArchiveOriginal {
logs = append(logs, "\t 非自制,忽略")
return
}
if err = s.moniDao.AddToDelArc(c, a); err != nil {
logs = append(logs, fmt.Sprintf("\t 添加删除信息到redis失败。error:%v", err))
err = nil
}
if id == moniMdl.RuleHighUpDelArc {
if upReply, err = s.up.GetHighAllyUps(c, &upApi.HighAllyUpsReq{Mids: []int64{a.MID}}); err != nil {
logs = append(logs, fmt.Sprintf("\t 获取UP主高能信息失败。error:%v", err))
log.Error("\t s.monitorUpDelArc() s.up.GetHighAllyUps() error:%v", err)
}
logs = append(logs, fmt.Sprintf("\t 用户信息:%+v", upReply))
if upReply != nil {
if _, ok := upReply.Lists[a.MID]; ok {
logs = append(logs, "\t UP主属于高能联盟")
satisfy = true
return
}
}
} else if id == moniMdl.RuleFamUpDelArc {
if pReply, err = s.acc.ProfileWithStat3(c, &accApi.MidReq{Mid: a.MID}); err != nil {
logs = append(logs, fmt.Sprintf("\t 获取UP主信息失败。error:%v", err))
log.Error("\t s.monitorUpDelArc() s.acc.ProfileWithStat3() error:%v", err)
}
logs = append(logs, fmt.Sprintf("\t 用户信息:%+v", pReply))
if pReply != nil && pReply.Follower >= 500000 {
logs = append(logs, "\t UP主属于大UP主")
satisfy = true
return
}
}
return
}
// monitorVideo 视频监控
func (s *Service) monitorVideo(act string, ov, nv *moniMdl.BinlogVideo) (errs []error) {
errs = s.monitorHandle(moniMdl.BusVideo, nv, nv.ID)
return
}
// monitorHandle 处理监控数据
func (s *Service) monitorHandle(bid int64, nObj interface{}, oid int64) (errs []error) {
var (
c = context.TODO()
rules []*moniMdl.Rule
logs, logs2 []string
err error
errs2 []error
oKeys, nKeys []string
)
defer func() {
logStr := strings.Join(logs, "\n")
if x := recover(); x != nil {
log.Error("s.monitorHandle() unknown panic(%v)", x)
} else if len(errs) > 0 {
log.Error("s.monitorHandle(\n na: %+v) \n logStr:\n %v \n error:%v", nObj, logStr, errs)
} else {
log.Info("s.monitorHandle(\n na: %+v) \n logStr:\n %v", nObj, logStr)
}
}()
if nObj == nil {
err = errors.New("new msg nil")
errs = append(errs, err)
logs = append(logs, "databus数据异常,new msg nil")
return
}
if rules, err = s.moniDao.RulesByBid(c, bid); err != nil {
logs = append(logs, "获取监控配置失败")
return
}
if len(rules) == 0 {
logs = append(logs, "监控配置不存在")
return
}
for _, rule := range rules {
var allSatisfy = true
//如果是监控UP主大量删稿,则特殊处理
if rule.ID == moniMdl.RuleHighUpDelArc || rule.ID == moniMdl.RuleFamUpDelArc {
if allSatisfy, logs2, err = s.monitorUpDelArc(rule.ID, nObj); err != nil {
errs = append(errs, err)
}
logs = append(logs, logs2...)
} else {
for field, cdt := range rule.RuleConf.MoniCdt {
var (
val int64
satisfy bool
)
if val, err = s.reflectIntVal(nObj, field, 0); err != nil {
errs = append(errs, err)
logs = append(logs, fmt.Sprintf("没有找到字段%s", field))
}
if satisfy, err = s.monitorCompSatisfy(cdt.Comp, val); err != nil {
allSatisfy = false
break
}
if !satisfy {
allSatisfy = false
break
}
}
}
if allSatisfy { //如果满足所有条件,则移入监控
nKeys = append(nKeys, fmt.Sprintf(moniMdl.RedisPrefix, rule.ID))
} else { //如果有条件不满足,则移出监控
oKeys = append(oKeys, fmt.Sprintf(moniMdl.RedisPrefix, rule.ID))
}
}
logs = append(logs, fmt.Sprintf("%d移出keys:%v", oid, oKeys))
logs = append(logs, fmt.Sprintf("%d移入keys:%v", oid, nKeys))
logs2, errs2 = s.monitorSave(oKeys, nKeys, oid)
logs = append(logs, logs2...)
if len(errs2) != 0 {
errs = append(errs, errs2...)
}
return
}
// monitorSave 保存结果
func (s *Service) monitorSave(oKeys, nKeys []string, oid int64) (logs []string, errs []error) {
var (
c = context.TODO()
logs2 []string
err error
)
defer func() {
logStr := strings.Join(logs, "\n")
if x := recover(); x != nil {
log.Error("s.monitorSave() unknown panic(%v)", x)
} else if len(errs) != 0 {
log.Error("s.monitorSave(\n oKeys: %v \n nKeys: %v \n oid: %d) \n logStr:\n %v \n error:%v", oKeys, nKeys, oid, logStr, errs)
} else {
log.Info("s.monitorSave(\n oKeys: %v \n nKeys: %v \n oid: %d) \n logStr:\n %v", oKeys, nKeys, oid, logStr)
}
}()
//从旧key中移出
logs2, err = s.moniDao.RemFromSet(c, oKeys, oid)
logs = append(logs, logs2...)
if err != nil {
errs = append(errs, err)
}
//清除过期的旧key
logs2, err = s.moniDao.ClearExpireSet(c, oKeys)
logs = append(logs, logs2...)
if err != nil {
errs = append(errs, err)
}
//移入到新key
logs2, err = s.moniDao.AddToSet(c, nKeys, oid)
logs = append(logs, logs2...)
if err != nil {
errs = append(errs, err)
}
//清除过期的移入key
logs2, err = s.moniDao.ClearExpireSet(c, nKeys)
logs = append(logs, logs2...)
if err != nil {
errs = append(errs, err)
}
return
}
// monitorNotify 监控报警
func (s *Service) monitorNotify() {
defer func() {
log.Warn("monitorNotify exited.")
}()
var (
c = context.TODO()
err error
rules []*moniMdl.Rule
stats *moniMdl.Stats
min, max int64
)
for {
log.Info("s.monitorNotify() begin")
if rules, err = s.moniDao.ValidRules(c); err != nil {
log.Error("s.monitorNotify() rules:%+v error:%v", rules, err)
time.Sleep(1 * time.Minute)
continue
}
for _, rule := range rules {
if rule.ID == moniMdl.RuleHighUpDelArc || rule.ID == moniMdl.RuleFamUpDelArc {
// 删稿监控特殊处理
s.wg.Add(1)
go s.moniUpDelArcNotify(rule)
continue
}
if min, max, err = s.monitorNotifyTime(rule.RuleConf); err != nil {
log.Error("s.monitorNotify() s.monitorNotifyTime() rule:%+v error:%v", rule, err)
continue
}
if stats, err = s.moniDao.MoniRuleStats(c, rule.ID, min, max); err != nil {
log.Error("s.monitorNotify() s.moniDao.MoniRuleStats(%d,%d,%d) error:%v", rule.ID, min, max, err)
continue
}
notify := s.moniSatisfyNotify(rule.RuleConf, stats)
if notify {
title := fmt.Sprintf("%s监控(aegis)", rule.RuleConf.Name)
body := fmt.Sprintf("当前滞留时间为%s超过阀值,滞留量为%d,整体量为%d \n报警时间:%s", secondsFormat(stats.MaxTime), stats.MoniCount, stats.TotalCount, time.Now().Format("2006-01-02 15:04:05"))
url := ""
switch rule.BID {
case moniMdl.BusVideo:
url = fmt.Sprintf("http://manager.bilibili.co/#!/video/list?monitor_list=%d_%d_%d", rule.Type, rule.BID, rule.ID)
case moniMdl.BusArc:
url = fmt.Sprintf("http://manager.bilibili.co/#!/archive_utils/all?monitor_list=%d_%d_%d", rule.Type, rule.BID, rule.ID)
}
body += fmt.Sprintf("\n跳转链接:点击跳转 %s", url, url)
if err = s.monitorSendNotify(c, rule.RuleConf.Notify.Way, rule.RuleConf.Notify.Member, title, body); err != nil {
log.Error("s.monitorNotify() s.monitorSendNotify(%d,%v,%s,%s) error:%v", rule.RuleConf.Notify.Way, rule.RuleConf.Notify.Member, title, body, err)
}
}
}
time.Sleep(30 * time.Minute)
}
}
// moniSatisfyNotify 检查监控是否满足报警,目前只有数量+时长的条件
func (s *Service) moniSatisfyNotify(conf *moniMdl.RuleConf, stats *moniMdl.Stats) (notify bool) {
notify = true
if _, ok := conf.MoniCdt["time"]; ok {
threshold := conf.NotifyCdt["time"].Value
comp := conf.NotifyCdt["time"].Comp
switch comp {
case moniMdl.CompGT:
if int64(stats.MaxTime) < threshold {
notify = false
}
case moniMdl.CompLT:
if int64(stats.MaxTime) > threshold {
notify = false
}
}
}
if _, ok := conf.NotifyCdt["count"]; ok {
threshold := conf.NotifyCdt["count"].Value
comp := conf.NotifyCdt["count"].Comp
switch comp {
case moniMdl.CompGT:
if int64(stats.MoniCount) < threshold {
notify = false
}
case moniMdl.CompLT:
if int64(stats.MoniCount) > threshold {
notify = false
}
}
}
return
}
// moniUpDelArcNotify 特殊处理UP主删稿的逻辑
func (s *Service) moniUpDelArcNotify(rule *moniMdl.Rule) (err error) {
var (
c = context.TODO()
min, max int64
oidMap map[int64]int
oids, mids []int64
infos map[int64]*moniMdl.DelArcInfo
delMap map[int64][]*moniMdl.DelArcInfo
accStats map[int64]*accApi.ProfileStatReply
threshold int
)
defer func() {
s.wg.Done()
}()
if _, ok := rule.RuleConf.NotifyCdt["count"]; !ok {
err = errors.New("notify count config error")
log.Error("s.moniUpDelArcNotify(%+v) 没有count监控配置", rule)
return
}
threshold = int(rule.RuleConf.NotifyCdt["count"].Value)
delMap = make(map[int64][]*moniMdl.DelArcInfo)
min, max, err = s.monitorNotifyTime(rule.RuleConf)
if err != nil {
log.Error("s.monitorNotifyTime() rule:%+v error:%v", rule, err)
return
}
if oidMap, err = s.moniDao.MoniRuleOids(c, rule.ID, min, max); err != nil {
log.Error("s.moniDao.MoniRuleOids() rule:%+v error:%v", rule, err)
return
}
for oid := range oidMap {
oids = append(oids, oid)
}
if infos, err = s.moniDao.ArcDelInfos(c, oids); err != nil {
log.Error("s.moniUpDelArcNotify() s.moniDao.ArcDelInfos(%v) error(%v)", oids, err)
return
}
for _, info := range infos {
delMap[info.MID] = append(delMap[info.MID], info)
mids = append(mids, info.MID)
}
if accStats, err = s.multiAccounts(c, mids); err != nil {
log.Error("s.moniUpDelArcNotify() s.multiAccounts(%v) error(%v)", mids, err)
accStats = make(map[int64]*accApi.ProfileStatReply)
}
for mid, ins := range delMap {
if _, ok := accStats[mid]; !ok {
log.Error("s.monitorNotify() account ")
accStats[mid] = &accApi.ProfileStatReply{
Profile: &accApi.Profile{
Name: "nil",
Mid: mid,
},
}
}
if len(ins) >= threshold {
var (
title, content string
)
for _, v := range ins {
if title == "" {
title = fmt.Sprintf("【异常删稿报警】“%s” 24内已删除%d个自制稿件 ", accStats[mid].Profile.Name, len(ins))
}
if content == "" {
content = fmt.Sprintf("监控规则:%d——%s——%s
报警时间:%s
", rule.ID, rule.Name, rule.RuleConf.Name, time.Now().Format("2006-01-02 15:04:05"))
content += fmt.Sprintf("UP主昵称:%s;mid: %d;当前粉丝数:%d; 24内已删除:%d;
", accStats[mid].Profile.Name, accStats[mid].Profile.Mid, accStats[mid].Follower, len(ins))
content += "
标题 | av号 | 删除时间 |
---|---|---|
%s | %d | %s |