package service import ( "context" "errors" "fmt" "github.com/aliyun/alibaba-cloud-sdk-go/services/dysmsapi" json "github.com/json-iterator/go" "github.com/olivere/elastic/v7" "github.com/opensearch-project/opensearch-go/opensearchapi" "github.com/spf13/cast" "github.com/thoas/go-funk" "github.com/wanghuiyt/ding" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/entity" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/vo/request" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/vo/response" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/common/client" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/common/conf" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/pkg/beagle/constant" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/pkg/beagle/jsontime" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/pkg/beagle/resp" "go.uber.org/zap" "io" "log" "net/http" "strings" "time" ) type AlertSvc struct { User entity.SystemUserInfo } var ( OpenSearchIndex = "so_alert" Mapping = strings.NewReader(`{ "settings": { "number_of_shards": 1, "number_of_replicas": 0, "index.max_result_window": "1000000" }, "mappings": { "properties": { "id": { "type": "integer" }, "alert_point": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "alert_rules_id": { "type": "keyword" }, "alert_rules_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "risk_level": { "type": "integer" }, "alert_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", "ignore_malformed": true }, "class_id": { "type": "integer" }, "class_parent_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "class_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "metric_config_id": { "type": "keyword" }, "metric_config_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "alert_rule_type": { "type": "keyword" }, "alert_rule_type_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "current_value": { "type": "integer" }, "notification_count": { "type": "integer" }, "push_count": { "type": "integer" }, "last_push_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", "ignore_malformed": true }, "status": { "type": "integer" }, "push_records": { "type": "nested", "properties": { "id": { "type": "integer" }, "alert_id": { "type": "integer" }, "alert_rules_id": { "type": "keyword" }, "risk_level": { "type": "integer" }, "notify_method": { "type": "keyword" }, "system_account": { "type": "keyword" }, "push_type": { "type": "integer" }, "status": { "type": "integer" }, "user_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "phone": { "type": "keyword" }, "created_by": { "type": "keyword" }, "updated_by": { "type": "keyword" } } }, "disposed_list": { "type": "nested", "properties": { "disposal_content": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "is_disposed": { "type": "integer" }, "disposal_user": { "type": "keyword" }, "disposal_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", "ignore_malformed": true } } }, "is_disposed": { "type": "integer" }, "close_remark": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "close_user": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "close_time": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "defer_push": { "type": "integer" }, "alert_condition": { "properties": { "thresholds_min": { "type": "integer" }, "thresholds_max": { "type": "integer" }, "risk_level": { "type": "integer" } } }, "disposal_content": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "created_by": { "type": "keyword" }, "created_at": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", "ignore_malformed": true }, "updated_by": { "type": "keyword" }, "updated_at": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis", "ignore_malformed": true } } } }`) ) func (a *AlertSvc) CreateIndex() error { cli, err := client.GetOpenSearch() if err != nil { return err } res := opensearchapi.IndicesCreateRequest{ Index: OpenSearchIndex, Body: Mapping, } do, err := res.Do(context.Background(), cli) if err != nil { return err } defer do.Body.Close() if do.StatusCode != http.StatusOK { return errors.New(do.String()) } return nil } func (a *AlertSvc) DeleteIndex() error { cli, err := client.GetOpenSearch() if err != nil { return err } res := opensearchapi.IndicesDeleteRequest{ Index: []string{OpenSearchIndex}, } do, err := res.Do(context.Background(), cli) if err != nil { return err } defer do.Body.Close() if do.StatusCode != http.StatusOK { return errors.New(do.String()) } return nil } func (a *AlertSvc) Indices() (indices []string, err error) { var ( catIndicesList []response.CatIndices ) cli, err := client.GetOpenSearch() if err != nil { return } res := opensearchapi.CatIndicesRequest{Format: "json"} do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &catIndicesList) if err != nil { return } funkGet := funk.Get(catIndicesList, "Index") //取一层 indices, ok := funkGet.([]string) if !ok { err = errors.New("funk.Get failed") return } return } func (a *AlertSvc) DocSearch(req request.ListAlert) (resp response.AlertList, err error) { var ( sources response.OpenSearchSource ) cli, err := client.GetOpenSearch() if err != nil { return } boolQuery := elastic.NewBoolQuery() if req.Id != 0 { boolQuery.Must(elastic.NewMatchQuery("id", req.Id)) } if req.AlertRulesId != "" { boolQuery.Must(elastic.NewMatchQuery("alert_rules_id", req.AlertRulesId)) } if req.RiskLevel != 0 { boolQuery.Must(elastic.NewMatchQuery("risk_level", req.RiskLevel)) } if req.Status != 0 { boolQuery.Must(elastic.NewMatchQuery("status", req.Status)) } // 请输入预警点/分类/指标 if req.Keyword != "" { subBoolQuery := elastic.NewBoolQuery() subBoolQuery.Should(elastic.NewMultiMatchQuery(req.Keyword, "alert_point", "class_parent_name", "class_name", "metric_config_name")) //subBoolQuery.Should(elastic.NewMatchQuery("class_name", req.Keyword)) boolQuery.Must(subBoolQuery) } if req.StartTime != "" { boolQuery.Filter(elastic.NewRangeQuery("created_at").Gte(req.StartTime).Lte(req.EndTime)) } if req.SystemAccount != "" { // 匹配我的预警工单 boolQuery.Must(elastic.NewNestedQuery("push_records", elastic.NewTermQuery("push_records.system_account", req.SystemAccount))) } querySource, _ := boolQuery.Source() b, _ := json.Marshal(querySource) log.Printf("es statements: %s\n", string(b)) if len(req.Ids) > 0 { var idsStr []string for _, id := range req.Ids { idsStr = append(idsStr, cast.ToString(id)) } ids := entity.OpenSearchIds{ Ids: struct { Values []string `json:"values"` }{idsStr}, } b, err = json.Marshal(&ids) if err != nil { return } } content := strings.NewReader(fmt.Sprintf(`{ "query": %s, "from": %d, "size": %d}`, string(b), req.GetPageSize()*(req.GetPage()-1), req.GetPageSize())) res := opensearchapi.SearchRequest{ Index: []string{OpenSearchIndex}, Body: content, Sort: []string{"id"}, } do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &sources) if err != nil { return } for _, hit := range sources.Hits.Hits { resp.List = append(resp.List, hit.Source) } resp.TotalCount = int64(len(resp.List)) return } func (a *AlertSvc) IndexDocExist(req request.ExistAlert) (exist bool, err error) { var ( sources response.OpenSearchSource ) cli, err := client.GetOpenSearch() if err != nil { return } boolQuery := elastic.NewBoolQuery() if req.Id != 0 { boolQuery.Must(elastic.NewTermQuery("id", req.Id)) } if req.AlertRulesId != "" { boolQuery.Must(elastic.NewTermQuery("alert_rules_id", req.AlertRulesId)) } querySource, _ := boolQuery.Source() b, _ := json.Marshal(querySource) content := strings.NewReader(fmt.Sprintf(`{ "query": %s, "from": %d, "size": %d}`, string(b), 0, 1)) res := opensearchapi.SearchRequest{ Index: []string{OpenSearchIndex}, Body: content, Sort: []string{"id"}, } do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &sources) if err != nil { return } if sources.Hits.Total.Value == 1 { exist = true } return } func (a *AlertSvc) CatCount(indexName ...string) (count int) { var ( list []response.CatIndexCount index string ) cli, err := client.GetOpenSearch() if err != nil { return } if len(indexName) > 0 && indexName[0] != "" { index = indexName[0] + "*" } else { index = OpenSearchIndex + "*" } res := opensearchapi.CatCountRequest{ Index: []string{index}, Format: "json", } do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &list) if err != nil { return } if len(list) > 0 { count = cast.ToInt(list[0].Count) } return } func (a *AlertSvc) DocCreate(req request.CreateAlert) (err error) { var ( sources response.OpenSearchSource ) cli, err := client.GetOpenSearch() if err != nil { return } b, _ := json.Marshal(&req.Alert) docStr := string(b) if docStr == "" { return errors.New("doc marshal failed") } content := strings.NewReader(fmt.Sprintf(`%s`, docStr)) res := opensearchapi.CreateRequest{ Index: OpenSearchIndex, DocumentID: cast.ToString(req.Id), Body: content, } do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &sources) if err != nil { return } conf.Logger.Info("--->alert create--->", zap.Any("DocCreate", sources)) return } func (a *AlertSvc) DocUpdate(req request.UpdateAlert) (err error) { var ( sources response.OpenSearchSource now = jsontime.Now() ) cli, err := client.GetOpenSearch() if err != nil { return } doc := make(map[string]interface{}, 0) if req.CloseRemark != "" { doc["close_remark"] = req.CloseRemark } if req.DeferPush != 0 { doc["defer_push"] = req.DeferPush } if req.RiskLevel != 0 { doc["risk_level"] = req.RiskLevel } if req.Status != 0 { doc["status"] = req.Status } if req.CurrentValue != 0 { doc["current_value"] = req.CurrentValue } /*if req.CurrentValue != 0 { doc["current_value"] = req.CurrentValue }*/ doc["updated_by"] = a.User.SystemAccount doc["updated_at"] = now if len(req.PushRecords) > 0 { doc["push_records"] = req.PushRecords } if len(doc) == 0 { return errors.New("no updated fields") } b, _ := json.Marshal(doc) docStr := string(b) if docStr == "" { return errors.New("doc marshal failed") } content := strings.NewReader(fmt.Sprintf(`{ "doc": %s }`, docStr)) res := opensearchapi.UpdateRequest{ Index: OpenSearchIndex, DocumentID: cast.ToString(req.Id), Body: content, Source: []string{"true"}, } do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &sources) if err != nil { return } conf.Logger.Info("--->alert update--->", zap.Any("DocUpdate", sources)) return } func (a *AlertSvc) Create() error { cli, err := client.GetOpenSearch() if err != nil { return err } res := opensearchapi.IndicesCreateRequest{ Index: OpenSearchIndex, Body: Mapping, } do, err := res.Do(context.Background(), cli) if err != nil { return err } defer do.Body.Close() if do.StatusCode != http.StatusOK { return errors.New(do.String()) } return nil } func (a *AlertSvc) Update(req request.UpdateAlert) error { err := a.DocUpdate(req) return err } func (a *AlertSvc) BatchPushAlert(req request.BatchPushAlert) (err error) { now := jsontime.Now() alertList, err := a.DocSearch(request.ListAlert{Ids: req.Ids}) if err != nil { return } for _, alert := range alertList.List { pushRecords := alert.PushRecords // 原始推送记录 var phones []string for _, v := range req.NotifyRecipients { // +临时推送记录 phones = append(phones, v.Phone) pushRecords = append(pushRecords, entity.PushRecord{ AlertId: alert.Id, AlertRulesId: alert.AlertRulesId, RiskLevel: alert.RiskLevel, NotifyMethod: req.NotifyMethod, SystemAccount: v.SystemAccount, PushTime: now, PushType: 2, Status: 1, UserName: v.UserName, Phone: v.Phone, CreatedBy: a.User.SystemAccount, CreatedAt: now, UpdatedBy: a.User.SystemAccount, UpdatedAt: now, }) } for _, method := range req.NotifyMethod { // 按照通知方式循环推送 switch method { // dingtalk sms case "sms": // 发送短信 smsErr := a.Sms( phones, alert.ClassParentName, alert.ClassName, alert.AlertPoint, alert.MetricConfigName, cast.ToString(alert.CurrentValue), constant.RiskLeveText(alert.RiskLevel)) if smsErr != nil { // 短信推送失败 for i := 0; i < len(pushRecords); i++ { pushRecords[i].Status = 2 } } case "dingtalk": smsErr := a.DingTalkMsg( phones, alert.ClassParentName, alert.ClassName, alert.AlertPoint, alert.MetricConfigName, cast.ToString(alert.CurrentValue), constant.RiskLeveText(alert.RiskLevel)) if smsErr != nil { // 短信推送失败 for i := 0; i < len(pushRecords); i++ { pushRecords[i].Status = 2 } } } } for i := 0; i < len(pushRecords); i++ { // id重新序列化 pushRecords[i].Id = i + 1 } err = a.DocUpdate(request.UpdateAlert{ Id: alert.Id, PushCount: len(pushRecords), PushRecords: pushRecords, }) if err != nil { return } } time.Sleep(time.Second) return nil } func (a *AlertSvc) BatchCloseAlert(req request.BatchCloseAlert) (err error) { var ids []int if len(req.Ids) > 0 { ids = req.Ids } else { ids = append(ids, req.Id) } for _, id := range ids { err = a.DocUpdate(request.UpdateAlert{ Id: id, CloseRemark: req.CloseRemark, DeferPush: req.DeferPush, Status: 3, }) if err != nil { return } } conf.Logger.Info("batch close", zap.Any("payload", req)) time.Sleep(time.Second) return } func (a *AlertSvc) GetDataById(req request.DetailAlert) (resp response.AlertItem, err error) { list, err := a.DocSearch(request.ListAlert{Id: req.Id}) if len(list.List) > 0 { resp = list.List[0] } return } func (a *AlertSvc) GetDataByAlertRulesIdAndRiskLevel(alertRulesId string, riskLevel int, status int) (resp response.AlertItem, err error) { list, err := a.DocSearch(request.ListAlert{AlertRulesId: alertRulesId, RiskLevel: riskLevel, Status: status}) if len(list.List) > 0 { resp = list.List[0] } /*if resp.Id == 0 { err = errors.New("no data found") }*/ return } func (a *AlertSvc) List(req request.ListAlert) (resp response.AlertList, err error) { resp, err = a.DocSearch(req) resp.TotalCount = int64(len(resp.List)) return } func (a *AlertSvc) DisposeAlert(req request.DisposeAlert) (err error) { // TODO 我的预警工单处置 var ( sources response.OpenSearchSource now = jsontime.Now() detailAlert response.AlertItem ) detailAlert, err = a.GetDataById(request.DetailAlert{Id: req.Id}) if err != nil { return err } if detailAlert.Status == constant.AlertRecovered { return errors.New("alert has been restored") } if detailAlert.Status == constant.AlertClosed { return errors.New("alert has been closed") } cli, err := client.GetOpenSearch() if err != nil { return } doc := make(map[string]interface{}, 0) doc["status"] = req.Status if req.DisposalContent != "" { disposedList := detailAlert.DisposedList disposedList = append(disposedList, entity.Disposed{ IsDisposed: req.Status, DisposalContent: req.DisposalContent, DisposalUser: a.User.SystemAccount, DisposalTime: now, }) doc["disposed_list"] = disposedList } b, _ := json.Marshal(doc) docStr := string(b) if docStr == "" { return errors.New("doc marshal failed") } content := strings.NewReader(fmt.Sprintf(`{ "doc": %s }`, docStr)) res := opensearchapi.UpdateRequest{ Index: OpenSearchIndex, DocumentID: cast.ToString(req.Id), Body: content, Source: []string{"true"}, } do, err := res.Do(context.Background(), cli) if err != nil { return } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { err = errors.New(do.String()) return } body, err := io.ReadAll(do.Body) if err != nil { return } err = json.Unmarshal(body, &sources) if err != nil { return } time.Sleep(time.Second) return nil } func (a *AlertSvc) Sms(phone []string, classParentName, className, alertPoint, metricConfigName, currentValue, riskLevel string) (err error) { var smsClient *dysmsapi.Client smsClient, err = dysmsapi.NewClientWithAccessKey("cn-hangzhou", conf.Options.SmsAccessKeyId, conf.Options.SmsAccessSecret) if err != nil { conf.Logger.Error("dysmsapi client error", zap.Error(err)) } /* 您有一条${tickettype}需要处理:预警分类:${classparentname},预警对象:${classname},预警点:${alertpoint},预警指标:${metricconfigname},预警值:${currentvalue},风险程度:${risklevel},请及时前往智能运维平台处理! */ params := map[string]interface{}{ "tickettype": "预警工单", "classparentname": classParentName, "classname": className, "alertpoint": alertPoint, "metricconfigname": metricConfigName, "currentvalue": currentValue, "risklevel": riskLevel, } templateParam, err := json.Marshal(params) if err != nil { conf.Logger.Error("序列化模板失败!", zap.Error(err)) err = resp.FAIL.ErrorDetail(err) return } smsRequest := dysmsapi.CreateSendSmsRequest() smsRequest.Scheme = "https" smsRequest.PhoneNumbers = strings.Join(phone, ",") smsRequest.TemplateCode = conf.Options.SmsTemplateAlert smsRequest.SignName = conf.Options.SmsSignName smsRequest.TemplateParam = string(templateParam) req, err := smsClient.SendSms(smsRequest) if err != nil { err = resp.FAIL.ErrorDetail(err) return } conf.Logger.Info("send SMS", zap.Any("req", req)) return } // DingTalkMsg 推送钉钉消息 func (a *AlertSvc) DingTalkMsg(phone []string, classParentName, className, alertPoint, metricConfigName, currentValue, riskLevel string) (err error) { d := ding.Webhook{ AccessToken: conf.Options.OrderDingTalkAccessToken, Secret: conf.Options.OrderDingTalkSecret, } msg := "您有一条${tickettype}需要处理:预警分类:${classparentname},预警对象:${classname},预警点:${alertpoint},预警指标:${metricconfigname},预警值:${currentvalue},风险程度:${risklevel},请及时前往智能运维平台处理!" params := map[string]interface{}{ "tickettype": "预警工单", "classparentname": classParentName, "classname": className, "alertpoint": alertPoint, "metricconfigname": metricConfigName, "currentvalue": currentValue, "risklevel": riskLevel, } for key, value := range params { placeholder := "${" + key + "}" msg = strings.Replace(msg, placeholder, fmt.Sprintf("%v", value), -1) } err = d.SendMessageText(msg, phone...) if err != nil { err = resp.FAIL.ErrorDetail(err) return } return }