You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
1.7 KiB

package core
import (
"bkb-seller/global"
"bkb-seller/service"
"fmt"
"github.com/go-redis/redis"
"strconv"
"strings"
"time"
)
var constKeyspaceFormat = "__keyevent@%d__"
var constKeyspacePatternFormat = "%s:%s"
type RedisEventSink struct {
redisClient *redis.Client
keyspacePattern string
subscriber *redis.PubSub
}
func RedisEventSinkStep(database int, pattern string) *RedisEventSink {
var sink *RedisEventSink
if global.MG_REDIS == nil {
panic("redis no initialization")
}
var keyspace = fmt.Sprintf(constKeyspaceFormat, database)
self := &RedisEventSink{
redisClient: global.MG_REDIS,
keyspacePattern: fmt.Sprintf(constKeyspacePatternFormat, keyspace, pattern),
}
self.init()
//log.Println("redis expired start...")
return sink
}
func (sink *RedisEventSink) init() {
sink.subscriber = sink.redisClient.PSubscribe(sink.keyspacePattern)
// Start observing for events...
go sink.listenForEvents()
}
func (sink *RedisEventSink) listenForEvents() {
for {
if sink.subscriber != nil {
msg, err := sink.subscriber.ReceiveMessage()
if msg != nil && err == nil {
key := msg.Payload
if key != "" {
keys := strings.Split(key, "_")
if len(keys) == 3 {
if keys[0] == "mission" { //任务状态
result, err := global.MG_REDIS.SetNX("nx:_"+key, "used", 2*time.Minute).Result()
if result && err == nil {
id, _ := strconv.Atoi(keys[2])
if id == 0 {
return
}
if keys[1] == "start" {
_ = service.StartMissionTiming(uint(id))
} else if keys[1] == "end" {
_ = service.EndMissionTiming(uint(id))
}
}
_, _ = global.MG_REDIS.Del("nx:_" + key).Result()
}
}
}
}
}
}
}