package DelayMessage import ( "encoding/json" "fmt" "github.com/pkg/errors" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "io/ioutil" "net/http" "strconv" "sync" "time" ) // 延迟消息 var CDelayMessage *mgo.Collection var CDelayErrorLog *mgo.Collection var GlobalDM *DelayMessage type Message struct { Id *bson.ObjectId `bson:"_id" json:"_id"` //延时时间 DelayTime int64 //callbackUrl CallbackUrl string `bson:"CallbackUrl" json:"CallbackUrl"` //失败次数 Fail int // 类型0=geturl;1=发送app消息 Type int8 `bson:"Type" json:"Type"` Title string `bson:"Title" json:"Title"` Content string `bson:"Content" json:"Content"` UDID string `bson:"UDID" json:"UDID"` UserId string `bson:"UserId" json:"UserId"` } // addTask func (dm *DelayMessage) AddTaskForGetUrl(delayTime string, userid string, callbackUrl string) error { iTIme, _ := strconv.Atoi(delayTime) i64Time := int64(iTIme) nowTimeU := time.Now().Unix() iDelayTIme := i64Time - nowTimeU if i64Time <= nowTimeU { return errors.New("delayTime error...") } if callbackUrl == "" { return errors.New("callbackUrl error...") } objectID := bson.NewObjectId() _Message := &Message{&objectID, i64Time, callbackUrl, 0, 0, "", "", "", userid} CDelayMessage.Insert(_Message) //添加任务 //iDelayTIme = 3 dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message) fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.CallbackUrl) println(objectID.Hex()) return nil } func (dm *DelayMessage) AddTaskForAppMessage(delayTime string, udid string, title string, content string, userid string) error { iTIme, _ := strconv.Atoi(delayTime) i64Time := int64(iTIme) nowTimeU := time.Now().Unix() iDelayTIme := i64Time - nowTimeU if i64Time <= nowTimeU { return errors.New("delayTime error...") } if udid == "" { return errors.New("udid error...") } if title == "" { return errors.New("title error...") } if content == "" { return errors.New("content error...") } if userid == "" { return errors.New("userid error...") } objectID := bson.NewObjectId() _Message := &Message{&objectID, i64Time, "", 0, 1, title, content, udid, userid} CDelayMessage.Insert(_Message) //添加任务 //iDelayTIme = 3 dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message) fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.Title) //println(objectID.Hex()) return nil } // delTask func (dm *DelayMessage) DelTaskForId(id string) { defer func() { if r := recover(); r != nil { fmt.Println("[E]", r) } }() CDelayMessage.Remove(bson.M{"_id": bson.ObjectIdHex(id)}) i := dm.DelTask(id) println("删除定时任务:",strconv.Itoa(i)) } func (dm *DelayMessage) Show() { //取出当前的槽的任务 fmt.Println("---------------------------------") for k, _ := range dm.slots { tasks := dm.slots[k] for _, v2 := range tasks { fmt.Println("当前秒数:", dm.curIndex, "下一任务:", k, "圈数:", v2.cycleNum, v2.params.CallbackUrl) } } fmt.Println("---------------------------------") } func Callback(key *bson.ObjectId, message *Message) { var body string var err error if message.Type == 0 { body, err = HttpGet(message.CallbackUrl) if err != nil { //fmt.Println(err,message) } } else if message.Type == 1 { } json, _ := json.Marshal(message) if body != "ok" { CDelayMessage.Remove(bson.M{"_id": *key}) fmt.Println("完成任务:", string(json)) } else { //message.Fail++ //if message.Fail == 3 { // fmt.Println(color.Red("放弃任务:"), message.CallbackUrl) // CDelayMessage.Remove(bson.M{"_id": *key}) // dbErrorLog.Insert(message) //} else { // fmt.Println("重新添加任务:", message) // dm.AddTask(time.Now().Add(time.Second*10), key, callback, message) //} fmt.Println("放弃任务:", string(json)) //CDelayMessage.Remove(bson.M{"_id": *key}) CDelayErrorLog.Insert(message) } } type DelayMessage struct { //当前下标 curIndex int //环形槽 sync.RWMutex slots [3600]map[*bson.ObjectId]*Task //启动时间 startTime time.Time } //执行的任务函数 type TaskFunc func(key *bson.ObjectId, message *Message) //任务 type Task struct { //循环次数 cycleNum int //执行的函数 exec TaskFunc params *Message } //创建一个延迟消息 func NewDelayMessage() *DelayMessage { dm := &DelayMessage{ curIndex: 0, startTime: time.Now(), } for i := 0; i < 3600; i++ { dm.slots[i] = make(map[*bson.ObjectId]*Task) } return dm } //启动延迟消息 func (dm *DelayMessage) Start() { //go dm.taskLoop() go dm.timeLoop() select {} } //处理每1秒的任务 func (dm *DelayMessage) taskLoop() { //取出当前的槽的任务 tasks := dm.slots[dm.curIndex] if len(tasks) > 0 { //遍历任务,判断任务循环次数等于0,则运行任务 //否则任务循环次数减1 for k, v := range tasks { if v.cycleNum == 0 { go v.exec(k, v.params) //删除运行过的任务 dm.RLock() delete(tasks, k) dm.RUnlock() } else { v.cycleNum-- } } } } //处理每1秒移动下标 func (dm *DelayMessage) timeLoop() { defer func() { fmt.Println("timeLoop exit") }() for { time.Sleep(time.Second) //fmt.Println(time.Now().Format("2006-01-02 15:04:05")) //判断当前下标,如果等于3599则重置为0,否则加1 dm.taskLoop() if dm.curIndex == 3599 { dm.curIndex = 0 } else { dm.curIndex++ } } } //添加任务 func (dm *DelayMessage) AddTask(t time.Time, exec TaskFunc, message *Message) error { if dm.startTime.After(t) { return errors.New("时间错误") } //计算循环次数 cycleNum := int((t.Unix() - time.Now().Unix()) / 3600) //计算任务所在的slots的下标 ix := (t.Unix() - time.Now().Unix() + int64(dm.curIndex)) % 3600 fmt.Println("AddTask-----", cycleNum, "圈后的第", ix, "位") //把任务加入tasks中 tasks := dm.slots[ix] //if _, ok := tasks[key]; ok { // return errors.New("该slots中已存在key为" + key + "的任务") //} dm.Lock() tasks[message.Id] = &Task{ cycleNum: cycleNum, exec: exec, params: message, } dm.Unlock() return nil } //删除任务 func (dm *DelayMessage) DelTask(key string) int { i := 0 for _, v := range dm.slots { for k2, _ := range v { if key == k2.Hex() { i++ delete(v, k2) } } } return i } var c = &http.Client{ Timeout: 5 * time.Second, } func HttpGet(url string) (string, error) { //fmt.Println(color.Yellow("http request: "), color.Yellow(url)) resp, err := c.Get(url) if err != nil { return "", err } defer func() { if err := recover(); err == nil { resp.Body.Close() } }() body, err := ioutil.ReadAll(resp.Body) if err != nil { return "", err } sBody := string(body) //fmt.Println(color.Yellow(sBody)) return sBody, nil }