Files
ArmedPolice/task/listen.go

78 lines
1.8 KiB
Go
Raw Normal View History

2021-11-02 09:43:19 +08:00
package task
import (
2021-11-02 16:22:07 +08:00
"ArmedPolice/config"
"ArmedPolice/serve/cache"
"ArmedPolice/serve/cache/logic"
"ArmedPolice/utils"
2021-11-02 09:43:19 +08:00
"fmt"
"sync"
2021-11-19 09:24:15 +08:00
"time"
2021-11-02 09:43:19 +08:00
)
type TaskListen struct {
Produce chan *Task
Consume chan *Task
2021-11-19 09:24:15 +08:00
ticker *time.Ticker
2021-11-02 09:43:19 +08:00
lock *sync.Mutex
}
var TaskListenEvent *TaskListen
func (this *TaskListen) Join(task *Task) {
TaskListenEvent.Produce <- task
}
func (this *TaskListen) Quit(task *Task) {
TaskListenEvent.Consume <- task
}
func (this *TaskListen) Listen() {
go utils.TryCatch(func() {
for {
select {
case p := <-this.Produce:
err := cache.Cache.ZAdd(config.RedisKeyForTaskQueue, &logic.ScoreParams{Score: float64(p.DelayUnix()), Member: p.ID})
if err != nil {
fmt.Printf("Task Produce Redis Sadd Error【%s】", err)
} else {
2021-11-19 09:24:15 +08:00
_ = cache.Cache.HSet(config.RedisKeyForTaskQueueBody, p.ID, p)
2021-11-02 09:43:19 +08:00
}
case c := <-this.Consume:
2021-11-19 09:24:15 +08:00
// 监听到数据,执行后续操作
_ = c.Handle()
case <-this.ticker.C:
now := time.Now()
_cache, _ := cache.Cache.ZRangebyscore(config.RedisKeyForTaskQueue, &logic.ScoreRangeBy{Min: "0",
Max: fmt.Sprintf("%d", now.Unix())})
if len(_cache) > 0 {
for _, v := range _cache {
body, _ := cache.Cache.HGet(config.RedisKeyForTaskQueueBody, v)
// TODO有Bug读取接口数据为空
task := new(Task)
task.Body = new(Order)
_ = task.UnmarshalBinary([]byte(body))
task.Consume()
}
// 销毁信息
_ = cache.Cache.ZRem(config.RedisKeyForTaskQueue, _cache)
_ = cache.Cache.HDel(config.RedisKeyForTaskQueueBody, _cache...)
2021-11-02 09:43:19 +08:00
}
}
}
})
}
func NewTaskListen() *TaskListen {
2021-11-19 09:24:15 +08:00
TaskListenEvent = &TaskListen{
Produce: make(chan *Task, 1),
Consume: make(chan *Task, 1),
ticker: time.NewTicker(time.Second * 1),
lock: new(sync.Mutex),
2021-11-02 09:43:19 +08:00
}
return TaskListenEvent
}