Files
ArmedPolice/task/listen.go
2021-11-19 09:24:15 +08:00

78 lines
1.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"ArmedPolice/config"
"ArmedPolice/serve/cache"
"ArmedPolice/serve/cache/logic"
"ArmedPolice/utils"
"fmt"
"sync"
"time"
)
type TaskListen struct {
Produce chan *Task
Consume chan *Task
ticker *time.Ticker
lock *sync.Mutex
}
var TaskListenEvent *TaskListen
func (this *TaskListen) Join(task *Task) {
TaskListenEvent.Produce <- task
}
func (this *TaskListen) Quit(task *Task) {
TaskListenEvent.Consume <- task
}
func (this *TaskListen) Listen() {
go utils.TryCatch(func() {
for {
select {
case p := <-this.Produce:
err := cache.Cache.ZAdd(config.RedisKeyForTaskQueue, &logic.ScoreParams{Score: float64(p.DelayUnix()), Member: p.ID})
if err != nil {
fmt.Printf("Task Produce Redis Sadd Error【%s】", err)
} else {
_ = cache.Cache.HSet(config.RedisKeyForTaskQueueBody, p.ID, p)
}
case c := <-this.Consume:
// 监听到数据,执行后续操作
_ = c.Handle()
case <-this.ticker.C:
now := time.Now()
_cache, _ := cache.Cache.ZRangebyscore(config.RedisKeyForTaskQueue, &logic.ScoreRangeBy{Min: "0",
Max: fmt.Sprintf("%d", now.Unix())})
if len(_cache) > 0 {
for _, v := range _cache {
body, _ := cache.Cache.HGet(config.RedisKeyForTaskQueueBody, v)
// TODO有Bug读取接口数据为空
task := new(Task)
task.Body = new(Order)
_ = task.UnmarshalBinary([]byte(body))
task.Consume()
}
// 销毁信息
_ = cache.Cache.ZRem(config.RedisKeyForTaskQueue, _cache)
_ = cache.Cache.HDel(config.RedisKeyForTaskQueueBody, _cache...)
}
}
}
})
}
func NewTaskListen() *TaskListen {
TaskListenEvent = &TaskListen{
Produce: make(chan *Task, 1),
Consume: make(chan *Task, 1),
ticker: time.NewTicker(time.Second * 1),
lock: new(sync.Mutex),
}
return TaskListenEvent
}