diff --git a/API/DealyMessage.go b/API/DealyMessage.go new file mode 100644 index 0000000..6c223ed --- /dev/null +++ b/API/DealyMessage.go @@ -0,0 +1,72 @@ +package Api + +import ( + "github.com/aarongao/tools" + "github.com/gin-gonic/gin" + "gopkg.in/mgo.v2/bson" + "letu/Lib/DelayMessage" +) + +// @Title 查询用户的定时提醒 +// @Description 查询用户的定时提醒 +// @Accept json +// @Produce json +// @Param UserId 5dfb03070a9ac17ac7a82054 string true "用户id" +// @Success 200 {object} tools.ResponseSeccess "DelayTime=执行时间;Type=类型(0请求url地址1发送app通知);Fail失败次数;Title=通知标题;Content=通知内容;UDID=设备id" +// @Failure 500 {object} tools.ResponseError "{"errcode":1,"errmsg":"错误原因"}" +// @Router /DealyMessage/Info? [get] +func DealyMessageInfo(c *gin.Context) { + c.Header("Access-Control-Allow-Origin", c.Request.Header.Get("Origin")) + c.Header("Access-Control-Allow-Credentials", "true") + + var aDelayMessage []DelayMessage.Message + DelayMessage.CDelayMessage.Find(bson.M{"UserId": c.Query("userid")}).All(&aDelayMessage) + + if aDelayMessage == nil { + + c.JSON(200, tools.ResponseError{ + 1, + "空", + }) + } else { + + c.JSON(200, tools.ResponseSeccess{ + 0, + aDelayMessage, + }) + } +} + +// @Title 创建提醒 +// @Description 创建提醒 +// @Accept json +// @Produce json +// @Param UserId 5dfb03070a9ac17ac7a82054 string true "用户id" +// @Param UDID 5dfb03070a9ac17ac7a82054 string true "设备id" +// @Param Title 表演时间提醒 string true "标题" +// @Param Content 5分钟后有表演 string true "内容" +// @Param DelayTime 1579066863 string true "到达这个时间戳就执行" +// @Success 200 {object} tools.ResponseSeccess "{"errcode":0,"result":"ok"}" +// @Failure 500 {object} tools.ResponseError "{"errcode":1,"errmsg":"错误原因"}" +// @Router /DealyMessage/Create? [post] +func CreateDealyMessage(c *gin.Context) { + c.Header("Access-Control-Allow-Origin", c.Request.Header.Get("Origin")) + c.Header("Access-Control-Allow-Credentials", "true") + + err := DelayMessage.GlobalDM.AddTaskForAppMessage(c.PostForm("DelayTime"), c.PostForm("UDID"), c.PostForm("Title"), c.PostForm("Content"), c.PostForm("UserId")) + + if err == nil { + + c.JSON(200, tools.ResponseSeccess{ + 0, + "ok", + }) + } else { + + c.JSON(200, tools.ResponseError{ + 1, + err.Error(), + }) + } + +} diff --git a/DB/db.go b/DB/db.go index 98c05ba..87c46d0 100644 --- a/DB/db.go +++ b/DB/db.go @@ -21,6 +21,7 @@ var CAccessLog *mgo.Collection //访问记录 var CActionLog *mgo.Collection //行为记录 var DB *mgo.Database + type SItem struct { Id *bson.ObjectId `bson:"_id" json:"Id" valid:"required"` Name string `bson:"Name" json:"Name"` diff --git a/Lib/DelayMessage/delaymessage.go b/Lib/DelayMessage/delaymessage.go new file mode 100644 index 0000000..95e85ab --- /dev/null +++ b/Lib/DelayMessage/delaymessage.go @@ -0,0 +1,319 @@ +package DelayMessage + +import ( + "encoding/json" + "fmt" + "github.com/pkg/errors" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + "io/ioutil" + "net/http" + "strconv" + "sync" + "time" +) + +// 延迟消息 +var CDelayMessage *mgo.Collection +var CDelayErrorLog *mgo.Collection +var GlobalDM *DelayMessage + +type Message struct { + Id *bson.ObjectId `bson:"_id" json:"_id"` + //延时时间 + DelayTime int64 + //callbackUrl + CallbackUrl string `bson:"CallbackUrl" json:"CallbackUrl"` + //失败次数 + Fail int + + // 类型0=geturl;1=发送app消息 + Type int8 `bson:"Type" json:"Type"` + Title string `bson:"Title" json:"Title"` + Content string `bson:"Content" json:"Content"` + UDID string `bson:"UDID" json:"UDID"` + UserId string `bson:"UserId" json:"UserId"` +} + +// addTask +func (dm *DelayMessage) AddTaskForGetUrl(delayTime string, userid string, callbackUrl string) error { + + iTIme, _ := strconv.Atoi(delayTime) + i64Time := int64(iTIme) + nowTimeU := time.Now().Unix() + iDelayTIme := i64Time - nowTimeU + + if i64Time <= nowTimeU { + return errors.New("delayTime error...") + } + if callbackUrl == "" { + return errors.New("callbackUrl error...") + } + + objectID := bson.NewObjectId() + _Message := &Message{&objectID, i64Time, callbackUrl, 0, 0, "", "", "", userid} + + CDelayMessage.Insert(_Message) + + //添加任务 + //iDelayTIme = 3 + dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message) + fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.CallbackUrl) + + println(objectID.Hex()) + return nil +} + +func (dm *DelayMessage) AddTaskForAppMessage(delayTime string, udid string, title string, content string, userid string) error { + + iTIme, _ := strconv.Atoi(delayTime) + i64Time := int64(iTIme) + nowTimeU := time.Now().Unix() + iDelayTIme := i64Time - nowTimeU + + if i64Time <= nowTimeU { + return errors.New("delayTime error...") + } + if udid == "" { + return errors.New("udid error...") + } + if title == "" { + return errors.New("title error...") + } + if content == "" { + return errors.New("content error...") + } + if userid == "" { + return errors.New("userid error...") + } + + objectID := bson.NewObjectId() + _Message := &Message{&objectID, i64Time, "", 0, 1, title, content, udid, userid} + + CDelayMessage.Insert(_Message) + + //添加任务 + //iDelayTIme = 3 + dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message) + fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.Title) + + //println(objectID.Hex()) + return nil +} + +// delTask +func (dm *DelayMessage) DelTaskForId(id string) { + + defer func() { + if r := recover(); r != nil { + fmt.Println("[E]", r) + } + }() + + CDelayMessage.Remove(bson.M{"_id": bson.ObjectIdHex(id)}) + i := dm.DelTask(id) + println(strconv.Itoa(i)) +} + +func (dm *DelayMessage) Show() { + + //取出当前的槽的任务 + fmt.Println("---------------------------------") + for k, _ := range dm.slots { + tasks := dm.slots[k] + for _, v2 := range tasks { + fmt.Println("当前秒数:", dm.curIndex, "下一任务:", k, "圈数:", v2.cycleNum, v2.params.CallbackUrl) + } + } + fmt.Println("---------------------------------") + +} + +func Callback(key *bson.ObjectId, message *Message) { + + var body string + var err error + if message.Type == 0 { + body, err = HttpGet(message.CallbackUrl) + if err != nil { + //fmt.Println(err,message) + } + + } else if message.Type == 1 { + + } + + json, _ := json.Marshal(message) + if body != "ok" { + CDelayMessage.Remove(bson.M{"_id": *key}) + + fmt.Println("完成任务:", string(json)) + } else { + //message.Fail++ + //if message.Fail == 3 { + // fmt.Println(color.Red("放弃任务:"), message.CallbackUrl) + // CDelayMessage.Remove(bson.M{"_id": *key}) + // dbErrorLog.Insert(message) + //} else { + // fmt.Println("重新添加任务:", message) + // dm.AddTask(time.Now().Add(time.Second*10), key, callback, message) + //} + + fmt.Println("放弃任务:", string(json)) + //CDelayMessage.Remove(bson.M{"_id": *key}) + CDelayErrorLog.Insert(message) + } + +} + +type DelayMessage struct { + //当前下标 + curIndex int + //环形槽 + sync.RWMutex + slots [3600]map[*bson.ObjectId]*Task + //启动时间 + startTime time.Time +} + +//执行的任务函数 +type TaskFunc func(key *bson.ObjectId, message *Message) + +//任务 +type Task struct { + //循环次数 + cycleNum int + //执行的函数 + exec TaskFunc + params *Message +} + +//创建一个延迟消息 +func NewDelayMessage() *DelayMessage { + dm := &DelayMessage{ + curIndex: 0, + startTime: time.Now(), + } + for i := 0; i < 3600; i++ { + dm.slots[i] = make(map[*bson.ObjectId]*Task) + } + return dm +} + +//启动延迟消息 +func (dm *DelayMessage) Start() { + //go dm.taskLoop() + go dm.timeLoop() + select {} +} + +//处理每1秒的任务 +func (dm *DelayMessage) taskLoop() { + + //取出当前的槽的任务 + tasks := dm.slots[dm.curIndex] + if len(tasks) > 0 { + //遍历任务,判断任务循环次数等于0,则运行任务 + //否则任务循环次数减1 + for k, v := range tasks { + if v.cycleNum == 0 { + go v.exec(k, v.params) + //删除运行过的任务 + dm.RLock() + delete(tasks, k) + dm.RUnlock() + } else { + v.cycleNum-- + } + } + } +} + +//处理每1秒移动下标 +func (dm *DelayMessage) timeLoop() { + defer func() { + fmt.Println("timeLoop exit") + }() + for { + + time.Sleep(time.Second) + //fmt.Println(time.Now().Format("2006-01-02 15:04:05")) + //判断当前下标,如果等于3599则重置为0,否则加1 + dm.taskLoop() + if dm.curIndex == 3599 { + dm.curIndex = 0 + } else { + dm.curIndex++ + } + } +} + +//添加任务 +func (dm *DelayMessage) AddTask(t time.Time, exec TaskFunc, message *Message) error { + if dm.startTime.After(t) { + return errors.New("时间错误") + } + //计算循环次数 + cycleNum := int((t.Unix() - time.Now().Unix()) / 3600) + //计算任务所在的slots的下标 + ix := (t.Unix() - time.Now().Unix() + int64(dm.curIndex)) % 3600 + + fmt.Println("AddTask-----", cycleNum, "圈后的第", ix, "位") + + //把任务加入tasks中 + tasks := dm.slots[ix] + //if _, ok := tasks[key]; ok { + // return errors.New("该slots中已存在key为" + key + "的任务") + //} + dm.Lock() + tasks[message.Id] = &Task{ + cycleNum: cycleNum, + exec: exec, + params: message, + } + dm.Unlock() + return nil +} + +//删除任务 +func (dm *DelayMessage) DelTask(key string) int { + + i := 0 + for _, v := range dm.slots { + for k2, _ := range v { + if key == k2.Hex() { + i++ + delete(v, k2) + } + } + } + return i +} + +var c = &http.Client{ + Timeout: 5 * time.Second, +} + +func HttpGet(url string) (string, error) { + + //fmt.Println(color.Yellow("http request: "), color.Yellow(url)) + resp, err := c.Get(url) + if err != nil { + return "", err + } + + defer func() { + if err := recover(); err == nil { + resp.Body.Close() + } + }() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + sBody := string(body) + //fmt.Println(color.Yellow(sBody)) + return sBody, nil +} diff --git a/main.go b/main.go index 739341b..da43690 100644 --- a/main.go +++ b/main.go @@ -5,12 +5,14 @@ import ( "github.com/aarongao/tools" "github.com/gin-gonic/gin" "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" "letu/Api" "letu/Config" "letu/DB" "letu/Lib/Cache" - "letu/Lib/Ws" + "letu/Lib/DelayMessage" "os" + "time" ) // @APIVersion 1.0.0 @@ -57,6 +59,8 @@ func main() { DB.CAccessLog = DB.DB.C("AccessLog") DB.CActionLog = DB.DB.C("ActionLog") DB.CInvestigation = DB.DB.C("Investigation") + DelayMessage.CDelayMessage = DB.DB.C("DelayMessage") + DelayMessage.CDelayErrorLog = DB.DB.C("DelayErrorLog") r := gin.Default() //r.Static("/.well-known", "./.well-known/") @@ -85,10 +89,34 @@ func main() { r.GET("/AccessLog", Api.AccessLog) r.POST("/Sms/Send", Api.Send) r.POST("/Investigation/Save", Api.Save) + r.POST("/DealyMessage/Create", Api.CreateDealyMessage) + r.GET("/DealyMessage/Info", Api.DealyMessageInfo) //r.GET("/ws", Api.WsPage) r.Static("/Upload", "./Upload") r.Static("/Console", "./Console") - go Ws.Manager.Start() + // go Ws.Manager.Start() + + // 创建延迟消息 + DelayMessage.GlobalDM = DelayMessage.NewDelayMessage() + + go func() { + DelayMessage.GlobalDM.Start() + }() + + // -初始化数据 + var aMessage []DelayMessage.Message + DelayMessage.CDelayMessage.Find(bson.M{}).All(&aMessage) + nowTimeU := time.Now().Unix() + for i := 0; i < len(aMessage); i++ { + iDelayTIme := aMessage[i].DelayTime - nowTimeU + + if iDelayTIme < 0 { + iDelayTIme = 1 + } + DelayMessage.GlobalDM.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), DelayMessage.Callback, &aMessage[i]) + } + println("增加", len(aMessage), "条提醒任务") + r.Run(":8080") } -- libgit2 0.21.0