package service import ( "bufio" "errors" "fmt" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/entity" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/vo/request" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/bean/vo/response" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/common/client" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/common/conf" "gitlab.wodcloud.com/smart-operation/so-operation-api/src/pkg/beagle/resp" "go.uber.org/zap" "io" "os" "os/exec" "strconv" "strings" "time" ) type TaskManageSvc struct { User entity.SystemUserInfo } // AddTaskManage 新增任务 func (t *TaskManageSvc) AddTaskManage(req request.AddTaskManageReq) (id int, err error) { db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } // 校验 任务名称 是否重复 has, err := db.Table("task_manage").Where("is_delete = 0 AND task_name = ?", req.TaskName).Exist() if has { err = resp.DbDataCheckError.WithError(errors.New("任务名称重复")) return } if err != nil { err = resp.DbSelectError.WithError(err) return } taskManage := entity.TaskManage{ TaskName: req.TaskName, TaskDesc: req.TaskDesc, YamlDesc: req.YamlDesc, YamlUrl: req.YamlUrl, HostGroupId: req.HostGroupId, CreateUser: t.User.SystemAccount, CreateTime: time.Now(), UpdateUser: t.User.SystemAccount, UpdateTime: time.Now(), } _, err = db.Table("task_manage").Insert(&taskManage) if err != nil { err = resp.DbInsertError.WithError(err) return } return taskManage.Id, nil } // EditTaskManage 编辑任务 func (t *TaskManageSvc) EditTaskManage(req request.EditTaskManageReq) (id int, err error) { db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } taskManage := entity.TaskManage{ TaskDesc: req.TaskDesc, YamlDesc: req.YamlDesc, YamlUrl: req.YamlUrl, HostGroupId: req.HostGroupId, UpdateUser: t.User.SystemAccount, UpdateTime: time.Now(), } //编辑任务 _, err = db.Table("task_manage").Where("is_delete = 0 AND id = ?", req.Id). Cols("task_desc,yaml_desc,yaml_url,host_group_id,update_user,update_time").Update(&taskManage) if err != nil { err = resp.DbUpdateError.WithError(err) return } return req.Id, nil } // DelTaskManage 删除任务 func (t *TaskManageSvc) DelTaskManage(req request.DelTaskManageReq) (err error) { db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } //编辑任务 _, err = db.Table("task_manage").In("id", req.Id).Cols("is_delete").Update(&entity.TaskManage{ IsDelete: 1, }) if err != nil { err = resp.DbDeleteError.WithError(err) return } return } // DetailsTaskManage 任务详情 func (t *TaskManageSvc) DetailsTaskManage(id int) (taskManageRes response.TaskManageRes, err error) { db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } //查询任务详情 finder := db.Table("task_manage").Alias("tm"). Where("is_delete = 0 AND id = ?", id) _, err = finder.Select("tm.id,tm.task_name,tm.task_desc,tm.yaml_desc,tm.yaml_url,tm.create_user,tm.create_time,tm.host_group_id," + "(select count(1) from task_history th where th.task_id = tm.id) as exec_cnt," + "(select count(1) from task_history th where th.task_id = tm.id and th.state = 1) as success_cnt," + "(select count(1) from task_history th where th.task_id = tm.id and th.state = 2) as fail_cnt").Get(&taskManageRes) if err != nil { err = resp.DbSelectError.WithError(err) return } //查询主机列表 hostList := make([]response.HostList, 0) err = db.Table("host_manage_list").Where("is_delete = 0 AND host_group_id = ?", taskManageRes.HostGroupId). Select("string_agg(ip,',') as ip,port,voucher_type,user_name,password,host_file_url"). GroupBy("ip_group,port,voucher_type,user_name,PASSWORD,host_file_url").Find(&hostList) if err != nil { err = resp.DbSelectError.WithError(err) return } for _, v := range hostList { if v.HostFileUrl != "" { taskManageRes.HostFileUrl = v.HostFileUrl } } if taskManageRes.HostFileUrl == "" { taskManageRes.HostList = hostList } return } // ListTaskManage 任务列表 func (t *TaskManageSvc) ListTaskManage(req request.ListTaskManageReq) (total int64, taskManageListRes []response.TaskManageListRes, err error) { db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } finder := db.Table("task_manage").Alias("tm").Where("tm.is_delete = 0") if req.Search != "" { finder.Where(fmt.Sprintf("(tm.task_name LIKE '%s' OR tm.task_desc LIKE '%s' OR tm.create_user LIKE '%s')", "%"+req.Search+"%", "%"+req.Search+"%", "%"+req.Search+"%")) } if req.CreateDateFrom != "" { finder.Where("tm.create_time >= ?", req.CreateDateFrom) } if req.CreateDateTo != "" { finder.Where("tm.create_time <= ?", req.CreateDateTo) } if req.HostGroupId != 0 { finder.Where("tm.host_group_id = ?", req.HostGroupId) } finder.OrderBy("tm.id") //查询任务 total, err = finder.Select("tm.id,tm.task_name,tm.task_desc,(select count(*) from task_history th "+ "where th.task_id = tm.id) as exec_cnt,tm.create_user,tm.create_time").OrderBy("tm.create_time"). Limit(req.PageSize, (req.Page-1)*req.PageSize).FindAndCount(&taskManageListRes) if err != nil { err = resp.DbSelectError.WithError(err) return } return } // DetailsTaskManage 任务详情 func (t *TaskManageSvc) GetTaskManage(id int) (getTaskManage response.GetTaskManage, err error) { db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } //查询任务详情 finder := db.Table("task_manage"). Where("is_delete = 0 AND id = ?", id) _, err = finder.Get(&getTaskManage) if err != nil { err = resp.DbSelectError.WithError(err) return } return } func (t *TaskManageSvc) ExecScript(req request.ExecScriptReq, script string) (id int, err error) { //查询主机信息 db, err := client.GetDbClient() if err != nil { err = resp.DbConnectError.WithError(err) return } //查询任务详情 hostManageList := make([]response.HostManageListRes, 0) finder := db.Table("task_manage").Alias("tm"). Join("INNER", "host_manage hm", "tm.host_group_id = hm.id"). Join("INNER", "host_manage_list hml", "hm.id = hml.host_group_id"). Where("tm.is_delete = 0 AND hm.is_delete = 0 AND hml.is_delete = 0 AND tm.id = ?", req.TaskId) err = finder.Select("hml.ip,hml.ip,hml.port,hml.voucher_type,hml.user_name,hml.password").Find(&hostManageList) if err != nil { err = resp.DbSelectError.WithError(err) return } //新增主机分组列表 var hosts []string for _, v := range hostManageList { hostsIp := "" if v.VoucherType == 0 { hostsIp = fmt.Sprintf("%s:%s ansible_ssh_user=\"%s\" ansible_ssh_pass=\"%s\" ansible_host_key_checking=false", v.Ip, v.Port, v.UserName, v.Password) } else { hostsIp = fmt.Sprintf("%s:%s ansible_ssh_user=\"%s\" ansible_ssh_private_key_file=/root/.ssh/id_rsa ansible_host_key_checking=false", v.Ip, v.Port, v.UserName) } hosts = append(hosts, hostsIp) } //写入主机信息 //hostsIp := strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", " ", -1) hostsGroup, err := os.Create("/etc/ansible/hosts_" + fmt.Sprintf("%d", req.TaskId)) if err != nil { err = resp.FileExecError.WithError(err) return } defer hostsGroup.Close() _, err = hostsGroup.Write([]byte(strings.Join(hosts, "\n"))) if err != nil { err = resp.FileExecError.WithError(err) return } //写入执行脚本 f2, err := os.Create("/etc/ansible/ansible_" + fmt.Sprintf("%d", req.TaskId) + ".yml") if err != nil { err = resp.FileExecError.WithError(err) return } defer f2.Close() _, err = f2.Write([]byte(script)) if err != nil { err = resp.FileExecError.WithError(err) return } //新增任务历史 id, err = AddExecHistory(request.AddExecHistory{ TaskId: req.TaskId, CreateUser: t.User.SystemAccount, }) if err != nil { return } go ExecAnsible(id, req.TaskId, req.Value) ////执行ansible命令 //var cmd *exec.Cmd //if req.Value != "" { // cmd = exec.Command("ansible-playbook", "-i", "/etc/ansible/hosts_"+fmt.Sprintf("%d", req.TaskId), "/etc/ansible/ansible_"+fmt.Sprintf("%d", req.TaskId)+".yml", "--extra-vars", req.Value) //} else { // cmd = exec.Command("ansible-playbook", "-i", "/etc/ansible/hosts_"+fmt.Sprintf("%d", req.TaskId), "/etc/ansible/ansible_"+fmt.Sprintf("%d", req.TaskId)+".yml") //} ////ansible-playbook -i /tmp/hosts --list-hosts debug.yml ////捕获正常日志 //stdout, err := cmd.StdoutPipe() //if err != nil { // err = resp.CmdExecError.WithError(err) // return //} ////捕获异常日志 //stderr, err := cmd.StderrPipe() //if err != nil { // err = resp.CmdExecError.WithError(err) // return //} ////执行cmd命令 //if err = cmd.Start(); err != nil { // err = resp.CmdExecError.WithError(err) // return //} ////获取 正常/异常 输出流 //outputBuf := bufio.NewReader(stdout) //readerr := bufio.NewReader(stderr) // //var out, outErr int //var execLog string //for { // // //逐行输出日志 // lineOut, err1 := outputBuf.ReadString('\n') // if (err1 != nil || io.EOF == err1) && out == 0 { // out = 1 // } else if out == 0 { // //存储执行日志 // execLog = execLog + lineOut + " \n " // UpdateExecHistory(request.UpdateExecHistory{ // TaskHistoryId: id, // ExecLog: execLog, // }) // } // // lineErr, err2 := readerr.ReadString('\n') // if (err2 != nil || io.EOF == err2) && outErr == 0 { // outErr = 1 // } else if outErr == 0 { // //存储异常执行日志 // execLog = execLog + lineErr + " \n " // UpdateExecHistory(request.UpdateExecHistory{ // TaskHistoryId: id, // ExecLog: execLog, // }) // } // // if out == 1 && outErr == 1 { // break // } //} //cmd.Wait() // //if cmd.ProcessState.Success() { // //任务执行成功 // UpdateExecHistory(request.UpdateExecHistory{ // TaskHistoryId: id, // ExecLog: execLog, // State: 1, // }) //} else { // //任务执行失败 // UpdateExecHistory(request.UpdateExecHistory{ // TaskHistoryId: id, // ExecLog: execLog, // State: 2, // }) //} return } // ExecAnsible 执行ansible命令 func ExecAnsible(id, taskId int, value string) { var cmd *exec.Cmd if value != "" { cmd = exec.Command("ansible-playbook", "-i", "/etc/ansible/hosts_"+fmt.Sprintf("%d", taskId), "/etc/ansible/ansible_"+fmt.Sprintf("%d", taskId)+".yml", "--extra-vars", value) } else { cmd = exec.Command("ansible-playbook", "-i", "/etc/ansible/hosts_"+fmt.Sprintf("%d", taskId), "/etc/ansible/ansible_"+fmt.Sprintf("%d", taskId)+".yml") } //ansible-playbook -i /tmp/hosts --list-hosts debug.yml //捕获正常日志 stdout, err := cmd.StdoutPipe() if err != nil { err = resp.CmdExecError.WithError(err) conf.Logger.Error("Capture normal logs", zap.Error(err)) //return } //捕获异常日志 stderr, err := cmd.StderrPipe() if err != nil { err = resp.CmdExecError.WithError(err) conf.Logger.Error("Capture exception logs", zap.Error(err)) //return } //执行cmd命令 if err = cmd.Start(); err != nil { err = resp.CmdExecError.WithError(err) conf.Logger.Error("Execute cmd command", zap.Error(err)) //return } //获取 正常/异常 输出流 outputBuf := bufio.NewReader(stdout) readerr := bufio.NewReader(stderr) var out, outErr int var execLog string redis, err := client.GetRedisClient() if err != nil { zap.L().Error(err.Error()) } for { //逐行输出日志 lineOut, err1 := outputBuf.ReadString('\n') if (err1 != nil || io.EOF == err1) && out == 0 { out = 1 } else if out == 0 { //存储执行日志 execLog = execLog + lineOut + " \n " //err = UpdateExecHistory(request.UpdateExecHistory{ // TaskHistoryId: id, // ExecLog: execLog, //}) //if err != nil { // conf.Logger.Error("Store Execution Log", zap.Error(err)) // //return //} // TODO 写缓存 err = redis.HSet(conf.AutoExecHistory, strconv.Itoa(id), execLog) if err != nil { conf.Logger.Error("Store Execution Log", zap.Error(err)) //return } } lineErr, err2 := readerr.ReadString('\n') if (err2 != nil || io.EOF == err2) && outErr == 0 { outErr = 1 } else if outErr == 0 { //存储异常执行日志 execLog = execLog + lineErr + " \n " //err = UpdateExecHistory(request.UpdateExecHistory{ // TaskHistoryId: id, // ExecLog: execLog, //}) //if err != nil { // conf.Logger.Error("Store abnormal execution logs", zap.Error(err)) // //return //} // TODO 写缓存 err = redis.HSet(conf.AutoExecHistory, strconv.Itoa(id), execLog) if err != nil { conf.Logger.Error("Store Execution Log", zap.Error(err)) //return } } if out == 1 && outErr == 1 { break } } cmd.Wait() if cmd.ProcessState.Success() { //任务执行成功 err = UpdateExecHistory(request.UpdateExecHistory{ TaskHistoryId: id, ExecLog: execLog, State: 1, }) if err != nil { conf.Logger.Error("Modify Execution Status", zap.Error(err)) //return } redis.Del(strconv.Itoa(id)) } else { //任务执行失败 err = UpdateExecHistory(request.UpdateExecHistory{ TaskHistoryId: id, ExecLog: execLog, State: 2, }) if err != nil { conf.Logger.Error("Modify Execution Status", zap.Error(err)) //return } redis.Del(strconv.Itoa(id)) } //return }