Commit 5af620a9b412be0085d7b95e7bf81e615ac2e09a

Authored by aarongao
1 parent 8a4e95da
Exists in v1.2 and in 2 other branches master, v1.1

通知

API/DealyMessage.go 0 → 100644
... ... @@ -0,0 +1,72 @@
  1 +package Api
  2 +
  3 +import (
  4 + "github.com/aarongao/tools"
  5 + "github.com/gin-gonic/gin"
  6 + "gopkg.in/mgo.v2/bson"
  7 + "letu/Lib/DelayMessage"
  8 +)
  9 +
  10 +// @Title 查询用户的定时提醒
  11 +// @Description 查询用户的定时提醒
  12 +// @Accept json
  13 +// @Produce json
  14 +// @Param UserId 5dfb03070a9ac17ac7a82054 string true "用户id"
  15 +// @Success 200 {object} tools.ResponseSeccess "DelayTime=执行时间;Type=类型(0请求url地址1发送app通知);Fail失败次数;Title=通知标题;Content=通知内容;UDID=设备id"
  16 +// @Failure 500 {object} tools.ResponseError "{"errcode":1,"errmsg":"错误原因"}"
  17 +// @Router /DealyMessage/Info? [get]
  18 +func DealyMessageInfo(c *gin.Context) {
  19 + c.Header("Access-Control-Allow-Origin", c.Request.Header.Get("Origin"))
  20 + c.Header("Access-Control-Allow-Credentials", "true")
  21 +
  22 + var aDelayMessage []DelayMessage.Message
  23 + DelayMessage.CDelayMessage.Find(bson.M{"UserId": c.Query("userid")}).All(&aDelayMessage)
  24 +
  25 + if aDelayMessage == nil {
  26 +
  27 + c.JSON(200, tools.ResponseError{
  28 + 1,
  29 + "空",
  30 + })
  31 + } else {
  32 +
  33 + c.JSON(200, tools.ResponseSeccess{
  34 + 0,
  35 + aDelayMessage,
  36 + })
  37 + }
  38 +}
  39 +
  40 +// @Title 创建提醒
  41 +// @Description 创建提醒
  42 +// @Accept json
  43 +// @Produce json
  44 +// @Param UserId 5dfb03070a9ac17ac7a82054 string true "用户id"
  45 +// @Param UDID 5dfb03070a9ac17ac7a82054 string true "设备id"
  46 +// @Param Title 表演时间提醒 string true "标题"
  47 +// @Param Content 5分钟后有表演 string true "内容"
  48 +// @Param DelayTime 1579066863 string true "到达这个时间戳就执行"
  49 +// @Success 200 {object} tools.ResponseSeccess "{"errcode":0,"result":"ok"}"
  50 +// @Failure 500 {object} tools.ResponseError "{"errcode":1,"errmsg":"错误原因"}"
  51 +// @Router /DealyMessage/Create? [post]
  52 +func CreateDealyMessage(c *gin.Context) {
  53 + c.Header("Access-Control-Allow-Origin", c.Request.Header.Get("Origin"))
  54 + c.Header("Access-Control-Allow-Credentials", "true")
  55 +
  56 + err := DelayMessage.GlobalDM.AddTaskForAppMessage(c.PostForm("DelayTime"), c.PostForm("UDID"), c.PostForm("Title"), c.PostForm("Content"), c.PostForm("UserId"))
  57 +
  58 + if err == nil {
  59 +
  60 + c.JSON(200, tools.ResponseSeccess{
  61 + 0,
  62 + "ok",
  63 + })
  64 + } else {
  65 +
  66 + c.JSON(200, tools.ResponseError{
  67 + 1,
  68 + err.Error(),
  69 + })
  70 + }
  71 +
  72 +}
... ...
DB/db.go
... ... @@ -21,6 +21,7 @@ var CAccessLog *mgo.Collection //访问记录
21 21 var CActionLog *mgo.Collection //行为记录
22 22 var DB *mgo.Database
23 23  
  24 +
24 25 type SItem struct {
25 26 Id *bson.ObjectId `bson:"_id" json:"Id" valid:"required"`
26 27 Name string `bson:"Name" json:"Name"`
... ...
Lib/DelayMessage/delaymessage.go 0 → 100644
... ... @@ -0,0 +1,319 @@
  1 +package DelayMessage
  2 +
  3 +import (
  4 + "encoding/json"
  5 + "fmt"
  6 + "github.com/pkg/errors"
  7 + "gopkg.in/mgo.v2"
  8 + "gopkg.in/mgo.v2/bson"
  9 + "io/ioutil"
  10 + "net/http"
  11 + "strconv"
  12 + "sync"
  13 + "time"
  14 +)
  15 +
  16 +// 延迟消息
  17 +var CDelayMessage *mgo.Collection
  18 +var CDelayErrorLog *mgo.Collection
  19 +var GlobalDM *DelayMessage
  20 +
  21 +type Message struct {
  22 + Id *bson.ObjectId `bson:"_id" json:"_id"`
  23 + //延时时间
  24 + DelayTime int64
  25 + //callbackUrl
  26 + CallbackUrl string `bson:"CallbackUrl" json:"CallbackUrl"`
  27 + //失败次数
  28 + Fail int
  29 +
  30 + // 类型0=geturl;1=发送app消息
  31 + Type int8 `bson:"Type" json:"Type"`
  32 + Title string `bson:"Title" json:"Title"`
  33 + Content string `bson:"Content" json:"Content"`
  34 + UDID string `bson:"UDID" json:"UDID"`
  35 + UserId string `bson:"UserId" json:"UserId"`
  36 +}
  37 +
  38 +// addTask
  39 +func (dm *DelayMessage) AddTaskForGetUrl(delayTime string, userid string, callbackUrl string) error {
  40 +
  41 + iTIme, _ := strconv.Atoi(delayTime)
  42 + i64Time := int64(iTIme)
  43 + nowTimeU := time.Now().Unix()
  44 + iDelayTIme := i64Time - nowTimeU
  45 +
  46 + if i64Time <= nowTimeU {
  47 + return errors.New("delayTime error...")
  48 + }
  49 + if callbackUrl == "" {
  50 + return errors.New("callbackUrl error...")
  51 + }
  52 +
  53 + objectID := bson.NewObjectId()
  54 + _Message := &Message{&objectID, i64Time, callbackUrl, 0, 0, "", "", "", userid}
  55 +
  56 + CDelayMessage.Insert(_Message)
  57 +
  58 + //添加任务
  59 + //iDelayTIme = 3
  60 + dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message)
  61 + fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.CallbackUrl)
  62 +
  63 + println(objectID.Hex())
  64 + return nil
  65 +}
  66 +
  67 +func (dm *DelayMessage) AddTaskForAppMessage(delayTime string, udid string, title string, content string, userid string) error {
  68 +
  69 + iTIme, _ := strconv.Atoi(delayTime)
  70 + i64Time := int64(iTIme)
  71 + nowTimeU := time.Now().Unix()
  72 + iDelayTIme := i64Time - nowTimeU
  73 +
  74 + if i64Time <= nowTimeU {
  75 + return errors.New("delayTime error...")
  76 + }
  77 + if udid == "" {
  78 + return errors.New("udid error...")
  79 + }
  80 + if title == "" {
  81 + return errors.New("title error...")
  82 + }
  83 + if content == "" {
  84 + return errors.New("content error...")
  85 + }
  86 + if userid == "" {
  87 + return errors.New("userid error...")
  88 + }
  89 +
  90 + objectID := bson.NewObjectId()
  91 + _Message := &Message{&objectID, i64Time, "", 0, 1, title, content, udid, userid}
  92 +
  93 + CDelayMessage.Insert(_Message)
  94 +
  95 + //添加任务
  96 + //iDelayTIme = 3
  97 + dm.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), Callback, _Message)
  98 + fmt.Println("增加新任务:", "当前位置", dm.curIndex, ",", iDelayTIme, "秒后触发", _Message.Title)
  99 +
  100 + //println(objectID.Hex())
  101 + return nil
  102 +}
  103 +
  104 +// delTask
  105 +func (dm *DelayMessage) DelTaskForId(id string) {
  106 +
  107 + defer func() {
  108 + if r := recover(); r != nil {
  109 + fmt.Println("[E]", r)
  110 + }
  111 + }()
  112 +
  113 + CDelayMessage.Remove(bson.M{"_id": bson.ObjectIdHex(id)})
  114 + i := dm.DelTask(id)
  115 + println(strconv.Itoa(i))
  116 +}
  117 +
  118 +func (dm *DelayMessage) Show() {
  119 +
  120 + //取出当前的槽的任务
  121 + fmt.Println("---------------------------------")
  122 + for k, _ := range dm.slots {
  123 + tasks := dm.slots[k]
  124 + for _, v2 := range tasks {
  125 + fmt.Println("当前秒数:", dm.curIndex, "下一任务:", k, "圈数:", v2.cycleNum, v2.params.CallbackUrl)
  126 + }
  127 + }
  128 + fmt.Println("---------------------------------")
  129 +
  130 +}
  131 +
  132 +func Callback(key *bson.ObjectId, message *Message) {
  133 +
  134 + var body string
  135 + var err error
  136 + if message.Type == 0 {
  137 + body, err = HttpGet(message.CallbackUrl)
  138 + if err != nil {
  139 + //fmt.Println(err,message)
  140 + }
  141 +
  142 + } else if message.Type == 1 {
  143 +
  144 + }
  145 +
  146 + json, _ := json.Marshal(message)
  147 + if body != "ok" {
  148 + CDelayMessage.Remove(bson.M{"_id": *key})
  149 +
  150 + fmt.Println("完成任务:", string(json))
  151 + } else {
  152 + //message.Fail++
  153 + //if message.Fail == 3 {
  154 + // fmt.Println(color.Red("放弃任务:"), message.CallbackUrl)
  155 + // CDelayMessage.Remove(bson.M{"_id": *key})
  156 + // dbErrorLog.Insert(message)
  157 + //} else {
  158 + // fmt.Println("重新添加任务:", message)
  159 + // dm.AddTask(time.Now().Add(time.Second*10), key, callback, message)
  160 + //}
  161 +
  162 + fmt.Println("放弃任务:", string(json))
  163 + //CDelayMessage.Remove(bson.M{"_id": *key})
  164 + CDelayErrorLog.Insert(message)
  165 + }
  166 +
  167 +}
  168 +
  169 +type DelayMessage struct {
  170 + //当前下标
  171 + curIndex int
  172 + //环形槽
  173 + sync.RWMutex
  174 + slots [3600]map[*bson.ObjectId]*Task
  175 + //启动时间
  176 + startTime time.Time
  177 +}
  178 +
  179 +//执行的任务函数
  180 +type TaskFunc func(key *bson.ObjectId, message *Message)
  181 +
  182 +//任务
  183 +type Task struct {
  184 + //循环次数
  185 + cycleNum int
  186 + //执行的函数
  187 + exec TaskFunc
  188 + params *Message
  189 +}
  190 +
  191 +//创建一个延迟消息
  192 +func NewDelayMessage() *DelayMessage {
  193 + dm := &DelayMessage{
  194 + curIndex: 0,
  195 + startTime: time.Now(),
  196 + }
  197 + for i := 0; i < 3600; i++ {
  198 + dm.slots[i] = make(map[*bson.ObjectId]*Task)
  199 + }
  200 + return dm
  201 +}
  202 +
  203 +//启动延迟消息
  204 +func (dm *DelayMessage) Start() {
  205 + //go dm.taskLoop()
  206 + go dm.timeLoop()
  207 + select {}
  208 +}
  209 +
  210 +//处理每1秒的任务
  211 +func (dm *DelayMessage) taskLoop() {
  212 +
  213 + //取出当前的槽的任务
  214 + tasks := dm.slots[dm.curIndex]
  215 + if len(tasks) > 0 {
  216 + //遍历任务,判断任务循环次数等于0,则运行任务
  217 + //否则任务循环次数减1
  218 + for k, v := range tasks {
  219 + if v.cycleNum == 0 {
  220 + go v.exec(k, v.params)
  221 + //删除运行过的任务
  222 + dm.RLock()
  223 + delete(tasks, k)
  224 + dm.RUnlock()
  225 + } else {
  226 + v.cycleNum--
  227 + }
  228 + }
  229 + }
  230 +}
  231 +
  232 +//处理每1秒移动下标
  233 +func (dm *DelayMessage) timeLoop() {
  234 + defer func() {
  235 + fmt.Println("timeLoop exit")
  236 + }()
  237 + for {
  238 +
  239 + time.Sleep(time.Second)
  240 + //fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
  241 + //判断当前下标,如果等于3599则重置为0,否则加1
  242 + dm.taskLoop()
  243 + if dm.curIndex == 3599 {
  244 + dm.curIndex = 0
  245 + } else {
  246 + dm.curIndex++
  247 + }
  248 + }
  249 +}
  250 +
  251 +//添加任务
  252 +func (dm *DelayMessage) AddTask(t time.Time, exec TaskFunc, message *Message) error {
  253 + if dm.startTime.After(t) {
  254 + return errors.New("时间错误")
  255 + }
  256 + //计算循环次数
  257 + cycleNum := int((t.Unix() - time.Now().Unix()) / 3600)
  258 + //计算任务所在的slots的下标
  259 + ix := (t.Unix() - time.Now().Unix() + int64(dm.curIndex)) % 3600
  260 +
  261 + fmt.Println("AddTask-----", cycleNum, "圈后的第", ix, "位")
  262 +
  263 + //把任务加入tasks中
  264 + tasks := dm.slots[ix]
  265 + //if _, ok := tasks[key]; ok {
  266 + // return errors.New("该slots中已存在key为" + key + "的任务")
  267 + //}
  268 + dm.Lock()
  269 + tasks[message.Id] = &Task{
  270 + cycleNum: cycleNum,
  271 + exec: exec,
  272 + params: message,
  273 + }
  274 + dm.Unlock()
  275 + return nil
  276 +}
  277 +
  278 +//删除任务
  279 +func (dm *DelayMessage) DelTask(key string) int {
  280 +
  281 + i := 0
  282 + for _, v := range dm.slots {
  283 + for k2, _ := range v {
  284 + if key == k2.Hex() {
  285 + i++
  286 + delete(v, k2)
  287 + }
  288 + }
  289 + }
  290 + return i
  291 +}
  292 +
  293 +var c = &http.Client{
  294 + Timeout: 5 * time.Second,
  295 +}
  296 +
  297 +func HttpGet(url string) (string, error) {
  298 +
  299 + //fmt.Println(color.Yellow("http request: "), color.Yellow(url))
  300 + resp, err := c.Get(url)
  301 + if err != nil {
  302 + return "", err
  303 + }
  304 +
  305 + defer func() {
  306 + if err := recover(); err == nil {
  307 + resp.Body.Close()
  308 + }
  309 + }()
  310 +
  311 + body, err := ioutil.ReadAll(resp.Body)
  312 + if err != nil {
  313 + return "", err
  314 + }
  315 +
  316 + sBody := string(body)
  317 + //fmt.Println(color.Yellow(sBody))
  318 + return sBody, nil
  319 +}
... ...
main.go
... ... @@ -5,12 +5,14 @@ import (
5 5 "github.com/aarongao/tools"
6 6 "github.com/gin-gonic/gin"
7 7 "gopkg.in/mgo.v2"
  8 + "gopkg.in/mgo.v2/bson"
8 9 "letu/Api"
9 10 "letu/Config"
10 11 "letu/DB"
11 12 "letu/Lib/Cache"
12   - "letu/Lib/Ws"
  13 + "letu/Lib/DelayMessage"
13 14 "os"
  15 + "time"
14 16 )
15 17  
16 18 // @APIVersion 1.0.0
... ... @@ -57,6 +59,8 @@ func main() {
57 59 DB.CAccessLog = DB.DB.C("AccessLog")
58 60 DB.CActionLog = DB.DB.C("ActionLog")
59 61 DB.CInvestigation = DB.DB.C("Investigation")
  62 + DelayMessage.CDelayMessage = DB.DB.C("DelayMessage")
  63 + DelayMessage.CDelayErrorLog = DB.DB.C("DelayErrorLog")
60 64  
61 65 r := gin.Default()
62 66 //r.Static("/.well-known", "./.well-known/")
... ... @@ -85,10 +89,34 @@ func main() {
85 89 r.GET("/AccessLog", Api.AccessLog)
86 90 r.POST("/Sms/Send", Api.Send)
87 91 r.POST("/Investigation/Save", Api.Save)
  92 + r.POST("/DealyMessage/Create", Api.CreateDealyMessage)
  93 + r.GET("/DealyMessage/Info", Api.DealyMessageInfo)
88 94 //r.GET("/ws", Api.WsPage)
89 95  
90 96 r.Static("/Upload", "./Upload")
91 97 r.Static("/Console", "./Console")
92   - go Ws.Manager.Start()
  98 + // go Ws.Manager.Start()
  99 +
  100 + // 创建延迟消息
  101 + DelayMessage.GlobalDM = DelayMessage.NewDelayMessage()
  102 +
  103 + go func() {
  104 + DelayMessage.GlobalDM.Start()
  105 + }()
  106 +
  107 + // -初始化数据
  108 + var aMessage []DelayMessage.Message
  109 + DelayMessage.CDelayMessage.Find(bson.M{}).All(&aMessage)
  110 + nowTimeU := time.Now().Unix()
  111 + for i := 0; i < len(aMessage); i++ {
  112 + iDelayTIme := aMessage[i].DelayTime - nowTimeU
  113 +
  114 + if iDelayTIme < 0 {
  115 + iDelayTIme = 1
  116 + }
  117 + DelayMessage.GlobalDM.AddTask(time.Now().Add(time.Second*time.Duration(iDelayTIme)), DelayMessage.Callback, &aMessage[i])
  118 + }
  119 + println("增加", len(aMessage), "条提醒任务")
  120 +
93 121 r.Run(":8080")
94 122 }
... ...