package core import ( "bkb-seller/global" "bkb-seller/service" "fmt" "github.com/go-redis/redis" "strconv" "strings" "time" ) var constKeyspaceFormat = "__keyevent@%d__" var constKeyspacePatternFormat = "%s:%s" type RedisEventSink struct { redisClient *redis.Client keyspacePattern string subscriber *redis.PubSub } func RedisEventSinkStep(database int, pattern string) *RedisEventSink { var sink *RedisEventSink if global.MG_REDIS == nil { panic("redis no initialization") } var keyspace = fmt.Sprintf(constKeyspaceFormat, database) self := &RedisEventSink{ redisClient: global.MG_REDIS, keyspacePattern: fmt.Sprintf(constKeyspacePatternFormat, keyspace, pattern), } self.init() //log.Println("redis expired start...") return sink } func (sink *RedisEventSink) init() { sink.subscriber = sink.redisClient.PSubscribe(sink.keyspacePattern) // Start observing for events... go sink.listenForEvents() } func (sink *RedisEventSink) listenForEvents() { for { if sink.subscriber != nil { msg, err := sink.subscriber.ReceiveMessage() if msg != nil && err == nil { key := msg.Payload if key != "" { keys := strings.Split(key, "_") if len(keys) == 3 { if keys[0] == "mission" { //任务状态 result, err := global.MG_REDIS.SetNX("nx:_"+key, "used", 2*time.Minute).Result() if result && err == nil { id, _ := strconv.Atoi(keys[2]) if id == 0 { return } if keys[1] == "start" { _ = service.StartMissionTiming(uint(id)) } else if keys[1] == "end" { _ = service.EndMissionTiming(uint(id)) } } _, _ = global.MG_REDIS.Del("nx:_" + key).Result() } } } } } } }