package service import ( "context" "errors" "fmt" "github.com/jinzhu/copier" json "github.com/json-iterator/go" "github.com/opensearch-project/opensearch-go" "github.com/opensearch-project/opensearch-go/opensearchapi" "github.com/spf13/cast" "github.com/tidwall/gjson" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/entity" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/vo/request" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/vo/response" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/common/client" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/common/conf" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/pkg/beagle/constant" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/pkg/beagle/jsontime" "io" "net/http" "strings" "time" "xorm.io/xorm" ) type AlertOverviewSvc struct { User entity.SystemUserInfo } func (a *AlertOverviewSvc) Add(session *xorm.Session, req request.AddAlertOverview) (classId int, err error) { now := jsontime.Now() data := entity.AlertOverview{ CreatedBy: a.User.SystemAccount, CreatedAt: now, UpdatedBy: a.User.SystemAccount, UpdatedAt: now, } _ = copier.Copy(&data, &req) return } func (a *AlertOverviewSvc) Update(session *xorm.Session, req request.UpdateAlertOverview) error { now := jsontime.Now() data := entity.AlertOverview{ UpdatedBy: a.User.SystemAccount, UpdatedAt: now, } _ = copier.Copy(&data, &req) return nil } func (a *AlertOverviewSvc) Overview(req request.DetailAlertOverview) (resp response.AlertOverviewItem, err error) { alertOverviewList, _ := a.AlertOverview(req) metricConfigSvc := MetricConfigSvc{User: a.User} nameList, _ := metricConfigSvc.NameList() for i := 0; i < len(alertOverviewList); i++ { for j := 0; j < len(alertOverviewList[i].List); j++ { for _, v := range nameList.List { if v.Id == alertOverviewList[i].List[j].MetricName { alertOverviewList[i].List[j].MetricName = v.MetricName break } } } } riskLevelDistributions, _ := a.RiskLevelDistribution(req) alertStatusDistributions, _ := a.AlertStatusDistribution(req) alertClassDistributions, _ := a.AlertClassDistribution(req) alertFrequencyDistribution, _ := a.AlertFrequencyDistribution(req) resp = response.AlertOverviewItem{ AlertOverview: alertOverviewList, RiskLevelDistribution: riskLevelDistributions, AlertStatusDistribution: alertStatusDistributions, AlertClassDistribution: alertClassDistributions, AlertFrequencyDistribution: alertFrequencyDistribution, } return } func (a *AlertOverviewSvc) AlertOverview(req request.DetailAlertOverview) (resp []entity.AlertOverview, err error) { var ( sources response.AggAlertOverview ) cli, err := client.GetOpenSearch() if err != nil { return } if req.StartTime == "" { req.StartTime = time.Now().Format("2006-01-02") } if req.EndTime == "" { req.EndTime = time.Now().Add(time.Hour * 24).Format("2006-01-02") } content := fmt.Sprintf(`{ "size": 0, "query": { "range": { "created_at": { "gte": "%s", "lte": "%s" } } }, "aggs": { "group": { "terms": { "field": "risk_level", "size": 10 }, "aggs": { "group": { "terms": { "field": "metric_config_id", "size": 10 }, "aggs": { "unresolved_count": { "filter": { "term": { "status": 2 } } } } } } } } }`, req.StartTime, req.EndTime) body, err := executeQuery(cli, conf.Options.OpenSearchIndex, content) if err != nil { return } result := gjson.GetBytes(body, "aggregations") err = json.Unmarshal([]byte(result.String()), &sources) if err != nil { return } now := jsontime.Now() for _, v := range sources.Group.Buckets { alertOverview := entity.AlertOverview{ RiskLevel: v.Key, UnresolvedCount: 0, TotalCount: v.DocCount, List: nil, CreatedBy: a.User.SystemAccount, CreatedAt: now, UpdatedBy: a.User.SystemAccount, UpdatedAt: now, } var unresolvedCount int var alertArray []entity.AlertArray for _, bucket := range v.Group.Buckets { alertArray = append(alertArray, entity.AlertArray{ MetricName: bucket.Key, UnresolvedCount: bucket.UnresolvedCount.DocCount, TotalCount: bucket.DocCount, }) unresolvedCount += bucket.UnresolvedCount.DocCount } alertOverview.UnresolvedCount = unresolvedCount alertOverview.List = alertArray resp = append(resp, alertOverview) } return } func (a *AlertOverviewSvc) RiskLevelDistribution(req request.DetailAlertOverview) (resp []entity.AlertDistribution, err error) { var ( sources response.AlertDistributionGroup ) cli, err := client.GetOpenSearch() if err != nil { return } if req.StartTime == "" { req.StartTime = time.Now().Format("2006-01-02") } if req.EndTime == "" { req.EndTime = time.Now().Add(time.Hour * 24).Format("2006-01-02") } body, err := executeQuery(cli, conf.Options.OpenSearchIndex, buildAggQueryContent(req.StartTime, req.EndTime, "risk_level")) if err != nil { return } result := gjson.GetBytes(body, "aggregations") err = json.Unmarshal([]byte(result.String()), &sources) if err != nil { return } var alertDistributions []entity.AlertDistribution riskLevels := []int{constant.RiskLevelLow, constant.RiskLevelModerate, constant.RiskLevelHigh, constant.RiskLevelCritical} for _, level := range riskLevels { riskLevelDistribution := entity.AlertDistribution{Name: constant.RiskLeveText(level)} for _, bucket := range sources.Group.Buckets { if level == bucket.Key { riskLevelDistribution.Value = bucket.DocCount break } } alertDistributions = append(alertDistributions, riskLevelDistribution) } resp = alertDistributions return } func (a *AlertOverviewSvc) AlertStatusDistribution(req request.DetailAlertOverview) (resp []entity.AlertDistribution, err error) { var ( sources response.AlertDistributionGroup ) cli, err := client.GetOpenSearch() if err != nil { return } if req.StartTime == "" { req.StartTime = time.Now().Format("2006-01-02") } if req.EndTime == "" { req.EndTime = time.Now().Add(time.Hour * 24).Format("2006-01-02") } body, err := executeQuery(cli, conf.Options.OpenSearchIndex, buildAggQueryContent(req.StartTime, req.EndTime, "status")) if err != nil { return } result := gjson.GetBytes(body, "aggregations") err = json.Unmarshal([]byte(result.String()), &sources) if err != nil { return } var alertDistributions []entity.AlertDistribution alertStatusList := []int{constant.AlertRecovered, constant.AlertNotRecovered, constant.AlertClosed} for _, status := range alertStatusList { alertStatusDistribution := entity.AlertDistribution{Name: constant.AlertStatusText(status)} for _, bucket := range sources.Group.Buckets { if status == bucket.Key { alertStatusDistribution.Value = bucket.DocCount break } } alertDistributions = append(alertDistributions, alertStatusDistribution) } resp = alertDistributions return } func (a *AlertOverviewSvc) AlertClassDistribution(req request.DetailAlertOverview) (resp []entity.AlertDistribution, err error) { var ( sources response.AlertDistributionGroup ) cli, err := client.GetOpenSearch() if err != nil { return } if req.StartTime == "" { req.StartTime = time.Now().Format("2006-01-02") } if req.EndTime == "" { req.EndTime = time.Now().Add(time.Hour * 24).Format("2006-01-02") } body, err := executeQuery(cli, conf.Options.OpenSearchIndex, buildAggQueryContent(req.StartTime, req.EndTime, "class_id")) if err != nil { return } result := gjson.GetBytes(body, "aggregations") err = json.Unmarshal([]byte(result.String()), &sources) if err != nil { return } alertClassSvc := AlertClassSvc{User: a.User} alertObjectList, err := alertClassSvc.AlertObjectList() var alertDistributions []entity.AlertDistribution for _, bucket := range sources.Group.Buckets { alertDistribution := entity.AlertDistribution{Value: bucket.DocCount} for _, object := range alertObjectList.List { if bucket.Key == object.ClassId { alertDistribution.Name = object.ClassName } } alertDistributions = append(alertDistributions, alertDistribution) } resp = alertDistributions return } func (a *AlertOverviewSvc) AlertFrequencyDistribution(req request.DetailAlertOverview) (resp entity.AlertFrequencyDistribution, err error) { var ( sources response.DateHistogramGroup ) cli, err := client.GetOpenSearch() if err != nil { return } if req.StartTime == "" { req.StartTime = time.Now().Format("2006-01-02") } if req.EndTime == "" { req.EndTime = time.Now().Add(time.Hour * 24).Format("2006-01-02") } /* "gte": "now/d", "lt": "now+1d/d" */ /* "gte": "2023-07-17", "lte": "2023-07-18" */ content := `{ "size": 0, "query": { "range": { "created_at": { "gte": "now/d", "lt": "now+1d/d" } } }, "aggs": { "group": { "date_histogram": { "field": "created_at", "interval": "3h", "format": "HH", "time_zone": "+00:00" } } } }` body, err := executeQuery(cli, conf.Options.OpenSearchIndex, content) if err != nil { return } result := gjson.GetBytes(body, "aggregations") err = json.Unmarshal([]byte(result.String()), &sources) if err != nil { return } alertFrequencyDistribution := entity.AlertFrequencyDistribution{ XAxis: []string{"0-3时", "3-6时", "6-9时", "9-12时", "12-15时", "15-18时", "18-21时", "21-24时"}, Data: []int{0, 0, 0, 0, 0, 0, 0, 0}, } for i := 0; i < len(alertFrequencyDistribution.XAxis); i++ { for _, bucket := range sources.Group.Buckets { if cast.ToInt(bucket.KeyAsString) == i*3 { alertFrequencyDistribution.Data[i] = bucket.DocCount break } } } resp = alertFrequencyDistribution return } func buildAggQueryContent(startTime, endTime string, field string) string { return fmt.Sprintf(`{ "size": 0, "query": { "range": { "created_at": { "gte": "%s", "lte": "%s" } } }, "aggs": { "group": { "terms": { "field": "%s", "size": 10 } } } }`, startTime, endTime, field) } func executeQuery(cli *opensearch.Client, index string, content string) ([]byte, error) { res := opensearchapi.SearchRequest{ Index: []string{index}, Body: strings.NewReader(content), } do, err := res.Do(context.Background(), cli) if err != nil { return nil, err } defer do.Body.Close() if do.StatusCode < http.StatusOK && do.StatusCode > http.StatusIMUsed { return nil, errors.New(do.String()) } body, err := io.ReadAll(do.Body) if err != nil { return nil, err } return body, nil } func (a *AlertOverviewSvc) List(req request.ListAlertOverview) (resp response.AlertOverviewList, err error) { db, err := client.GetDbClient() if err != nil { return } session := db.NewSession() defer session.Close() session.Where("source = 1") if req.ClassId != 0 { session.Where("class_id = ?", req.ClassId) } if req.ClassName != "" { session.Where("class_name LIKE ?", "%"+req.ClassName+"%") } resp.TotalCount, err = session.Limit(req.GetPageSize(), (req.GetPage()-1)*req.GetPageSize()). OrderBy("sort_order").FindAndCount(&resp.List) return } func (a *AlertOverviewSvc) Delete(ids []int) (err error) { db, err := client.GetDbClient() if err != nil { return } _, err = db.NewSession().In("class_id", ids).Delete(&entity.AlertOverview{}) return }