Skip to content

Commit

Permalink
时间轮启动参数从配置文件中读取
Browse files Browse the repository at this point in the history
  • Loading branch information
ouqiang committed May 16, 2017
1 parent a7fd364 commit 93bb9f9
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 13 deletions.
18 changes: 15 additions & 3 deletions cmd/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,26 @@ func initModule() {
serviceTask := new(service.Task)
serviceTask.Initialize()

// 初始化延时任务
delayTaskEnabled, err := config.Key("delay.task.enable").Bool()
if err != nil {
logger.Error("获取延时任务配置失败", err)
return
}
if !delayTaskEnabled {
return
}
delayTaskSlots, err := config.Key("delay.task.slots").Int()
if err != nil {
return
}
delayTaskTick := config.Key("delay.task.tick").String()
tick, err := time.ParseDuration(delayTaskTick)
if err != nil {
return
}

serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Initialize()
serviceDelayTask.Initialize(tick, delayTaskSlots)
}

// 解析端口
Expand Down Expand Up @@ -135,12 +145,14 @@ func shutdown() {
os.Exit(0)
return
}
logger.Info("应用准备退出, 停止任务调度")
logger.Info("应用准备退出")
serviceTask := new(service.Task)
// 停止所有任务调度
logger.Info("停止定时任务调度")
serviceTask.StopAll()
delayTaskEnable, _ := app.Setting.Key("delay.task.enable").Bool()
if delayTaskEnable {
logger.Info("停止延时任务调度")
serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/ouqiang/gocron/cmd"
)

const AppVersion = "0.1"
const AppVersion = "0.2"

func main() {
app := cli.NewApp()
Expand Down
13 changes: 12 additions & 1 deletion modules/httpclient/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ func Get(url string, timeout int) ResponseWrapper {
return request(req, timeout)
}

func PostBody(url string, body string, timeout int) ResponseWrapper {
func PostParams(url string,params string, timeout int) ResponseWrapper {
buf := bytes.NewBufferString(params)
req, err := http.NewRequest("POST", url, buf)
if err != nil {
return createRequestError(err)
}
req.Header.Set("Content-type", "application/x-www-form-urlencoded")

return request(req, timeout)
}

func PostJson(url string, body string, timeout int) ResponseWrapper {
buf := bytes.NewBufferString(body)
req, err := http.NewRequest("POST", url, buf)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/notify/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (slack *Slack) send(msg Message, slackUrl string, channel string) {
maxTimes := 3
i := 0
for i < maxTimes {
resp := httpclient.PostBody(slackUrl, formatBody, timeout)
resp := httpclient.PostJson(slackUrl, formatBody, timeout)
if resp.StatusCode == 200 {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion routers/delaytask/delay_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Create(ctx *macaron.Context) string {
json := utils.JsonResponse{}
delayTaskEnabled, _ := app.Setting.Key("delay.task.enable").Bool()
if !delayTaskEnabled {
return json.CommonFailure("未开启延时任务")
return json.CommonFailure("系统未开启延时任务")
}
if url == "" {
return json.CommonFailure("url地址不能为空")
Expand Down
2 changes: 2 additions & 0 deletions routers/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func writeConfig(form InstallForm) error {
"allow_ips" : "",
"app.name": "定时任务管理系统", // 应用名称
"delay.task.enable": "false", // 是否开启延时任务
"delay.task.slots": "3600", // 时间轮槽数量
"delay.task.tick": "1s", // 时间轮每次转动的时间
}

return setting.Write(dbConfig, app.AppConfig)
Expand Down
2 changes: 1 addition & 1 deletion routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func Register(m *macaron.Macaron) {
m.Route("/tasklog/update-status", "GET,POST", tasklog.UpdateStatus)
m.Post("/tasklog/remove/:id", tasklog.Remove)
m.Post("/delaytask/push", delaytask.Create)
m.Post("/delaytask/remove/:id", delaytask.Remove)
m.Post("/delaytask/log/remove/:id", delaytask.Remove)
});

// 404错误
Expand Down
9 changes: 4 additions & 5 deletions service/delay_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ var tw *timewheel.TimeWheel
type DelayTask struct {}

// 从数据库中取出所有延迟任务
func (task *DelayTask) Initialize() {
tw = timewheel.New(1 * time.Second, 3600)
func (task *DelayTask) Initialize(tick time.Duration, slots int) {
tw = timewheel.New(tick, slots)
tw.Start()
taskModel := new(models.DelayTask)
currentTime := time.Now()
Expand Down Expand Up @@ -80,16 +80,15 @@ func (task *DelayTask) Run(id int64, url, params string) {
success := false
logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params)
for i := 0; i < tryTimes; {
response := httpclient.PostBody(url, params, timeout)
response := httpclient.PostParams(url, params, timeout)
if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{
success = true
break;
}
i++
if i < tryTimes {
logger.Errorf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s",
i,id,response.StatusCode,response.Body,
)
i,id,response.StatusCode,response.Body)
time.Sleep(30 * time.Second)
}
}
Expand Down

0 comments on commit 93bb9f9

Please sign in to comment.