package task import ( "ArmedPolice/config" "ArmedPolice/serve/cache" "ArmedPolice/serve/cache/logic" "ArmedPolice/utils" "fmt" "sync" "time" ) type TaskListen struct { Produce chan *Task Consume chan *Task ticker *time.Ticker lock *sync.Mutex } var TaskListenEvent *TaskListen func (this *TaskListen) Join(task *Task) { TaskListenEvent.Produce <- task } func (this *TaskListen) Quit(task *Task) { TaskListenEvent.Consume <- task } func (this *TaskListen) Listen() { go utils.TryCatch(func() { for { select { case p := <-this.Produce: err := cache.Cache.ZAdd(config.RedisKeyForTaskQueue, &logic.ScoreParams{Score: float64(p.DelayUnix()), Member: p.ID}) if err != nil { fmt.Printf("Task Produce Redis Sadd Error【%s】", err) } else { _ = cache.Cache.HSet(config.RedisKeyForTaskQueueBody, p.ID, p) } case c := <-this.Consume: // 监听到数据,执行后续操作 _ = c.Handle() case <-this.ticker.C: now := time.Now() _cache, _ := cache.Cache.ZRangebyscore(config.RedisKeyForTaskQueue, &logic.ScoreRangeBy{Min: "0", Max: fmt.Sprintf("%d", now.Unix())}) if len(_cache) > 0 { for _, v := range _cache { body, _ := cache.Cache.HGet(config.RedisKeyForTaskQueueBody, v) // TODO:有Bug,读取接口数据为空 task := new(Task) task.Body = new(Order) _ = task.UnmarshalBinary([]byte(body)) task.Consume() } // 销毁信息 _ = cache.Cache.ZRem(config.RedisKeyForTaskQueue, _cache) _ = cache.Cache.HDel(config.RedisKeyForTaskQueueBody, _cache...) } } } }) } func NewTaskListen() *TaskListen { TaskListenEvent = &TaskListen{ Produce: make(chan *Task, 1), Consume: make(chan *Task, 1), ticker: time.NewTicker(time.Second * 1), lock: new(sync.Mutex), } return TaskListenEvent }