delaymessage.go 6.82 KB
package DelayMessage

import (
	"encoding/json"
	"fmt"
	"github.com/pkg/errors"
	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
	"io/ioutil"
	"net/http"
	"strconv"
	"sync"
	"time"
)

// 延迟消息
var CDelayMessage *mgo.Collection
var CDelayErrorLog *mgo.Collection
var GlobalDM *DelayMessage

type Message struct {
	Id *bson.ObjectId `bson:"_id" json:"_id"`
	//延时时间
	DelayTime int64
	//callbackUrl
	CallbackUrl string `bson:"CallbackUrl" json:"CallbackUrl"`
	//失败次数
	Fail int

	// 类型0=geturl;1=发送app消息
	Type    int8   `bson:"Type" json:"Type"`
	Title   string `bson:"Title" json:"Title"`
	Content string `bson:"Content" json:"Content"`
	UDID    string `bson:"UDID" json:"UDID"`
	UserId  string `bson:"UserId" json:"UserId"`
}

// addTask
func (dm *DelayMessage) AddTaskForGetUrl(delayTime string, userid string, callbackUrl string) error {

	iTIme, _ := strconv.Atoi(delayTime)
	i64Time := int64(iTIme)
	nowTimeU := time.Now().Unix()
	iDelayTIme := i64Time - nowTimeU

	if i64Time <= nowTimeU {
		return errors.New("delayTime error...")
	}
	if callbackUrl == "" {
		return errors.New("callbackUrl error...")
	}

	objectID := bson.NewObjectId()
	_Message := &Message{&objectID, i64Time, callbackUrl, 0, 0, "", "", "", userid}

	CDelayMessage.Insert(_Message)

	//添加任务
	//iDelayTIme = 3
	dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message)
	fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.CallbackUrl)

	println(objectID.Hex())
	return nil
}

func (dm *DelayMessage) AddTaskForAppMessage(delayTime string, udid string, title string, content string, userid string) error {

	iTIme, _ := strconv.Atoi(delayTime)
	i64Time := int64(iTIme)
	nowTimeU := time.Now().Unix()
	iDelayTIme := i64Time - nowTimeU

	if i64Time <= nowTimeU {
		return errors.New("delayTime error...")
	}
	if udid == "" {
		return errors.New("udid error...")
	}
	if title == "" {
		return errors.New("title error...")
	}
	if content == "" {
		return errors.New("content error...")
	}
	if userid == "" {
		return errors.New("userid error...")
	}

	objectID := bson.NewObjectId()
	_Message := &Message{&objectID, i64Time, "", 0, 1, title, content, udid, userid}

	CDelayMessage.Insert(_Message)

	//添加任务
	//iDelayTIme = 3
	dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message)
	fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.Title)

	//println(objectID.Hex())
	return nil
}

// delTask
func (dm *DelayMessage) DelTaskForId(id string) {

	defer func() {
		if r := recover(); r != nil {
			fmt.Println("[E]", r)
		}
	}()

	CDelayMessage.Remove(bson.M{"_id": bson.ObjectIdHex(id)})
	i := dm.DelTask(id)
	println("删除定时任务:",strconv.Itoa(i))
}

func (dm *DelayMessage) Show() {

	//取出当前的槽的任务
	fmt.Println("---------------------------------")
	for k, _ := range dm.slots {
		tasks := dm.slots[k]
		for _, v2 := range tasks {
			fmt.Println("当前秒数:", dm.curIndex, "下一任务:", k, "圈数:", v2.cycleNum, v2.params.CallbackUrl)
		}
	}
	fmt.Println("---------------------------------")

}

func Callback(key *bson.ObjectId, message *Message) {

	var body string
	var err error
	if message.Type == 0 {
		body, err = HttpGet(message.CallbackUrl)
		if err != nil {
			//fmt.Println(err,message)
		}

	} else if message.Type == 1 {

	}

	json, _ := json.Marshal(message)
	if body != "ok" {
		CDelayMessage.Remove(bson.M{"_id": *key})

		fmt.Println("完成任务:", string(json))
	} else {
		//message.Fail++
		//if message.Fail == 3 {
		//	fmt.Println(color.Red("放弃任务:"), message.CallbackUrl)
		//  CDelayMessage.Remove(bson.M{"_id": *key})
		//	dbErrorLog.Insert(message)
		//} else {
		//	fmt.Println("重新添加任务:", message)
		//	dm.AddTask(time.Now().Add(time.Second*10), key, callback, message)
		//}

		fmt.Println("放弃任务:", string(json))
		//CDelayMessage.Remove(bson.M{"_id": *key})
		CDelayErrorLog.Insert(message)
	}

}

type DelayMessage struct {
	//当前下标
	curIndex int
	//环形槽
	sync.RWMutex
	slots [3600]map[*bson.ObjectId]*Task
	//启动时间
	startTime time.Time
}

//执行的任务函数
type TaskFunc func(key *bson.ObjectId, message *Message)

//任务
type Task struct {
	//循环次数
	cycleNum int
	//执行的函数
	exec   TaskFunc
	params *Message
}

//创建一个延迟消息
func NewDelayMessage() *DelayMessage {
	dm := &DelayMessage{
		curIndex:  0,
		startTime: time.Now(),
	}
	for i := 0; i < 3600; i++ {
		dm.slots[i] = make(map[*bson.ObjectId]*Task)
	}
	return dm
}

//启动延迟消息
func (dm *DelayMessage) Start() {
	//go dm.taskLoop()
	go dm.timeLoop()
	select {}
}

//处理每1秒的任务
func (dm *DelayMessage) taskLoop() {

	//取出当前的槽的任务
	tasks := dm.slots[dm.curIndex]
	if len(tasks) > 0 {
		//遍历任务,判断任务循环次数等于0,则运行任务
		//否则任务循环次数减1
		for k, v := range tasks {
			if v.cycleNum == 0 {
				go v.exec(k, v.params)
				//删除运行过的任务
				dm.RLock()
				delete(tasks, k)
				dm.RUnlock()
			} else {
				v.cycleNum--
			}
		}
	}
}

//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
	defer func() {
		fmt.Println("timeLoop exit")
	}()
	for {

		time.Sleep(time.Second)
		//fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
		//判断当前下标,如果等于3599则重置为0,否则加1
		dm.taskLoop()
		if dm.curIndex == 3599 {
			dm.curIndex = 0
		} else {
			dm.curIndex++
		}
	}
}

//添加任务
func (dm *DelayMessage) AddTask(t time.Time, exec TaskFunc, message *Message) error {
	if dm.startTime.After(t) {
		return errors.New("时间错误")
	}
	//计算循环次数
	cycleNum := int((t.Unix() - time.Now().Unix()) / 3600)
	//计算任务所在的slots的下标
	ix := (t.Unix() - time.Now().Unix() + int64(dm.curIndex)) % 3600

	fmt.Println("AddTask-----", cycleNum, "圈后的第", ix, "位")

	//把任务加入tasks中
	tasks := dm.slots[ix]
	//if _, ok := tasks[key]; ok {
	//	return errors.New("该slots中已存在key为" + key + "的任务")
	//}
	dm.Lock()
	tasks[message.Id] = &Task{
		cycleNum: cycleNum,
		exec:     exec,
		params:   message,
	}
	dm.Unlock()
	return nil
}

//删除任务
func (dm *DelayMessage) DelTask(key string) int {

	i := 0
	for _, v := range dm.slots {
		for k2, _ := range v {
			if key == k2.Hex() {
				i++
				delete(v, k2)
			}
		}
	}
	return i
}

var c = &http.Client{
	Timeout: 5 * time.Second,
}

func HttpGet(url string) (string, error) {

	//fmt.Println(color.Yellow("http request: "), color.Yellow(url))
	resp, err := c.Get(url)
	if err != nil {
		return "", err
	}

	defer func() {
		if err := recover(); err == nil {
			resp.Body.Close()
		}
	}()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return "", err
	}

	sBody := string(body)
	//fmt.Println(color.Yellow(sBody))
	return sBody, nil
}