package task import ( "Edu/config" "Edu/serve/cache" "Edu/serve/cache/logic" "Edu/utils" "fmt" "sync" ) type TaskListen struct { Produce chan *Task Consume chan *Task lock *sync.Mutex } var TaskListenEvent *TaskListen func (this *TaskListen) Join(task *Task) { TaskListenEvent.Produce <- task } func (this *TaskListen) Quit(task *Task) { TaskListenEvent.Consume <- task } func (this *TaskListen) Listen() { go utils.TryCatch(func() { for { select { case p := <-this.Produce: err := cache.Cache.ZAdd(config.RedisKeyForTaskQueue, &logic.ScoreParams{Score: float64(p.DelayUnix()), Member: p.ID}) if err != nil { fmt.Printf("Task Produce Redis Sadd Error【%s】", err) } else { err = cache.Cache.HSet(config.RedisKeyForTaskQueueBody, p.ID, p) fmt.Printf("err:%v\n", err) } case c := <-this.Consume: if err := c.Handle(); err != nil { fmt.Printf("err:%v\n", err) } } } }) } func NewTaskListen() *TaskListen { if TaskListenEvent == nil { TaskListenEvent = &TaskListen{ Produce: make(chan *Task, 1), Consume: make(chan *Task, 1), lock: new(sync.Mutex), } } return TaskListenEvent }