背景 最近做了几个需求都用了redis stream用作消息队列,感觉redis stream相当大轻量化,易于上手,且功能强大,为此特意实现了了一个极简但实用的 redis stream 的示例
redis stream 的三个概念 stream, consumer group , consumer
要想学会如何使用 redis stream, 最重要的就是理解 stream, consumer group , consumer 三者的关系。 简单来说:
- stream 为消息流, 类似于传送带负责传送商品
- consumer group 为获取消息流的一个团队
- consumer 为 consumer group 下的员工
一个 stream 下面可以包含多个 consumer group, 每个 consumer group下面可以包含多个 consumer .
我们在使用redis stream 之前需要提前使用XGroupCreateMkStream传入 stream名称 和 consumer group名称用来创建一个 stream 以及其对应的消费者组, 只有被注册的消费者组里面的消费者才能获取消息。
Pending Entries List (PEL) 与 二阶段提交
redis stream 也是支持二阶段提交与手动ack的, 这依赖于Pending Entries List (PEL) 。
使用 XReadGroup 传入 stream 名称,consumer group名称 和 consumer 名称 就可以获取到 消息, 被获取到的消息会进入 Pending Entries Lis (pel)里面, 这个pel相当于一个临时待确定的集合, 记录被消费者领取但是没有消费的消息,等消费者把消息 消费完成了还需要手动 ack 这条消息。 可以将pel类比成 git 的暂存区,需要被commit才算正式确认修改。
在pel的消息可以被同一个 consumer group的consumer获取,并且在一定时间内无法被其他consumer获取,这个时间叫做 MinIdle,MinIdle 保障了在这个时间范围内只能由一个消费者去二次消费消息。 使用 XAutoClaim 传入 stream 名称,consumer group名称, consumer 名称 和 MinIdle 即可从 pel 中领取近 MinIdle 时间范围内没有被领取的消息, 消息被领取后的 MinIdle 时间范围内无法被其他consumer领取
k8s 中 如何分配 consumer group 和 consumer ?
在实际开发中,一般一个类型的任务(比如消息通知系统)会用到一个 stream 和一个 consumer group. 虽然 stream 和 consumer group 是一对多的关系,但为了方便开发一般习惯于两者使用一对一的关系。 这点和工厂流水线同理,一条流水线最好还是专门传送一种商品。而consumer group 和 consumer的关系通常是一对多的,也就是真正需要干活的工作由多个 consumer 去完成。
如果一个k8s实例对应一个consumer(实际上也应该这么干,一台实例一个consumer, 一个consumer一次性可以领取多条消息),那么consumer name 就可以直接是当前机器的hostname, 如果一台只能同时消费多个消息,那么consumer内就可以直接是随机数。
考虑到k8s实例经常需要重启更新 consumer name 会越来越多,所以需要定期的去删除旧实例中的consuemr , 可以通过 XInfoConsumers 传入 stream name 和 group name 获取到当前consumer group 下的全部consumer.然后在通过每一个 consumer的 pending =0 和 Idle 大于某个值进行判断是否可以删除。
pending =0表示这个consumer没有被分片任何消息,idle会在每一次consumer与redis stream交互如 XReadGroup , XAutoClaim 被更新,大于某个值说明这个consumer很久没有任何动作了可以被判定为死掉
极简代码示例
目录结构
1redis_stream_demo 2 - internal 3 internal.go 操作 redis stream的相关函数 4 - test 5 push_task_test.go 模拟生产者往 redis stream 发消息 6 main.go 服务端消费者循环消费任务, 定期清理死掉的consumer 7 go.mod 8 go.sum 9 10
先来看单元测试文件: 内容很简单,模拟生产者往里面塞消息,再确认有没有塞进去
1package test 2 3import ( 4 "context" 5 "github.com/redis/go-redis/v9" 6 "redis_stream_demo/internal" 7 "testing" 8) 9 10func TestPushTask(t *testing.T) { 11 ctx := context.Background() 12 client := redis.NewClient(&redis.Options{ 13 Addr: "localhost:6379", 14 }) 15 16 // 模拟生产者 向流中添加多条消息 17 for i := 0; i < 50; i++ { 18 _, err := client.XAdd(ctx, &redis.XAddArgs{ 19 Stream: internal.SimpleStreamName, 20 Values: map[string]interface{}{ 21 "Msg": i, 22 }, 23 }).Result() 24 if err != nil { 25 t.Fatal(err) 26 } 27 } 28 29 // 查看是否将50条消息插入队列 30 queueSize, err := client.XLen(ctx, internal.SimpleStreamName).Result() 31 if err != nil { 32 t.Fatal(err) 33 } 34 if queueSize != 50 { 35 t.Fatal("queueSize != 50 ") 36 } 37 t.Log("add 50 tasks ok") 38} 39
再来看 main 函数 main 函数也很简单,创建了一个stream 和对应的 consumer group, 启动两个 consumer , 然后两个consumer 不断消费消息,同时有另一个go routine监控过期的 consumer 然后给予清理, 当所有消息被消费后退出程序
1package main 2 3import ( 4 "context" 5 "fmt" 6 "github.com/redis/go-redis/v9" 7 "math/rand" 8 "redis_stream_demo/internal" 9 "time" 10) 11 12func main() { 13 14 ctx := context.Background() 15 client := redis.NewClient(&redis.Options{ 16 Addr: "localhost:6379", 17 }) 18 19 // 创建消费者组 20 err := internal.RegisterConsumerGroup(client) 21 if err != nil { 22 panic(err) 23 } 24 25 // 启动2个消费者 26 consumer1 := fmt.Sprintf("consumer:%d", rand.Int63()) 27 consumer2 := fmt.Sprintf("consumer:%d", rand.Int63()) 28 29 fmt.Printf("[%s]\n", consumer1) 30 fmt.Printf("[%s]\n", consumer2) 31 32 go internal.LoopConsume(client, consumer1) 33 go internal.LoopConsume(client, consumer2) 34 35 // 启动定时清除旧的consumer 36 go internal.LoopDeleteDeadConsumer(client) 37 38 for { 39 if internal.IsAllDone(client, ctx) { 40 fmt.Println("all task done") 41 break 42 } 43 time.Sleep(time.Second) 44 } 45 46 47} 48
仔细看看internal.go 内部如何操作 redis stream
1package internal 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "github.com/redis/go-redis/v9" 8 "math/rand" 9 "time" 10) 11 12const ( 13 SimpleStreamName = "simple-stream" 14 SimpleGroupName = "simple-group" 15 taskIdleTimeout = 5 * time.Second // 5 秒后进入 pel 16 consumerIdleTimeout = time.Minute // consumer 超过1分钟没有活动则认为死亡 17) 18 19type TaskItem struct { 20 MsgId string 21 Msg string 22} 23 24// 创建消费者组,如果已经创建了不重复创建 25func RegisterConsumerGroup(client *redis.Client) error { 26 groupInfo, err := client.XInfoGroups(context.Background(), SimpleStreamName).Result() 27 if err != nil && err.Error() != "ERR no such key" { 28 return err 29 } 30 for _, group := range groupInfo { 31 if group.Name == SimpleGroupName { 32 return nil 33 } 34 } 35 return client.XGroupCreateMkStream(context.Background(), SimpleStreamName, SimpleGroupName, "0").Err() 36} 37 38func LoopConsume(client *redis.Client, consumerName string) { 39 40 for { 41 42 // 优先处理pel中的任务 43 task, err := pullPelTask(client, consumerName) 44 if err != nil { 45 fmt.Println(err) 46 time.Sleep(time.Second) 47 continue 48 } 49 50 if task != nil { 51 if err := processMessage(task); err != nil { 52 // 失败则执行下一条任务 53 fmt.Printf("[%s] processed pel task failed, messageid = %s\n", consumerName, task.MsgId) 54 continue 55 } 56 // 成功则 XACK 57 err = ackSuccessTask(context.Background(), client, SimpleStreamName, SimpleGroupName, task.MsgId) 58 if err != nil { 59 // 失败则执行下一条任务 60 fmt.Printf("[%s] acked pel task failed, messageid = %s\n", consumerName, task.MsgId) 61 continue 62 } 63 fmt.Printf("[%s] acked pel task ok, messageid = %s\n", consumerName, task.MsgId) 64 continue // 优先处理 pel 中积压的消息 65 } 66 67 // 拉取 1 条消息 68 task, err = pullTask(context.Background(), client, consumerName) 69 if err != nil { 70 fmt.Println(err) 71 time.Sleep(time.Second) 72 continue 73 } 74 if task != nil { 75 if err := processMessage(task); err != nil { 76 // 失败则执行下一条任务 77 fmt.Printf("[%s] processed task failed, messageid = %s\n", consumerName, task.MsgId) 78 continue 79 } 80 // 成功则 XACK 81 err = ackSuccessTask(context.Background(), client, SimpleStreamName, SimpleGroupName, task.MsgId) 82 if err != nil { 83 // 失败则执行下一条任务 84 fmt.Printf("[%s] acked task failed, messageid = %s\n", consumerName, task.MsgId) 85 continue 86 } 87 fmt.Printf("[%s] acked task ok, messageid = %s\n", consumerName, task.MsgId) 88 } else { 89 time.Sleep(3 * time.Second) // 没有任何消息 90 } 91 92 } 93} 94 95func LoopDeleteDeadConsumer(client *redis.Client) { 96 // 每10扫一次consumer 97 ticker := time.NewTicker(10 * time.Second) 98 for { 99 select { 100 case <-ticker.C: 101 consumers, err := readConsumer(context.Background(), client) 102 if err != nil { 103 fmt.Println(err) 104 break 105 } 106 if len(consumers) == 0 { 107 fmt.Println("tg robot len(consumers) == 0 ") 108 break 109 } 110 for _, consumer := range consumers { 111 // 条件: pending == 0 且 idle 超过阈值 112 if consumer.Pending == 0 && consumer.Idle > consumerIdleTimeout { 113 // 执行删除 114 _, err = deleteConsumer(context.Background(), client, consumer.Name) 115 if err != nil { 116 fmt.Println(err) 117 continue 118 } 119 fmt.Printf("[%s] (idle: %s, pending: %d) was deleted\n", 120 consumer.Name, consumer.Idle.String(), consumer.Pending) 121 } 122 } 123 } 124 } 125} 126 127func readConsumer(ctx context.Context, client *redis.Client) ([]redis.XInfoConsumer, error) { 128 consumers, err := client.XInfoConsumers(ctx, SimpleStreamName, SimpleGroupName).Result() 129 if err != nil { 130 return nil, err 131 } 132 return consumers, nil 133} 134 135func deleteConsumer(ctx context.Context, client *redis.Client, consumerName string) (int64, error) { 136 deleted, err := client.XGroupDelConsumer(ctx, SimpleStreamName, SimpleGroupName, consumerName).Result() 137 if err != nil { 138 return 0, err 139 } 140 return deleted, err 141} 142 143func ackSuccessTask(ctx context.Context, client *redis.Client, streamName, groupName, messageID string) error { 144 tx := client.TxPipeline() 145 // ack 任务 146 tx.XAck(ctx, streamName, groupName, messageID) 147 // 删除消息 148 tx.XDel(ctx, streamName, messageID) 149 _, err := tx.Exec(ctx) 150 if err != nil { 151 return err 152 } 153 return nil 154} 155 156func pullTask(ctx context.Context, client *redis.Client, consumer string) (*TaskItem, error) { 157 streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ 158 Group: SimpleGroupName, 159 Consumer: consumer, 160 Streams: []string{SimpleStreamName, ">"}, // ">" 是一个特殊 ID,表示从消费者组中尚未分配给任何消费者的新消息开始读取 161 Count: 1, 162 Block: 1000 * time.Millisecond, 163 }).Result() 164 if err != nil { 165 if err == redis.Nil { 166 return nil, nil 167 } 168 return nil, err 169 } 170 if len(streams) == 0 { 171 return nil, nil 172 } 173 if len(streams[0].Messages) == 0 { 174 return nil, nil 175 } 176 177 message := streams[0].Messages[0] 178 messageData := message.Values 179 msg, ok := messageData["Msg"].(string) 180 if !ok { 181 return nil, errors.New("messageData["Msg"].(string) !ok") 182 } 183 task := &TaskItem{ 184 MsgId: message.ID, 185 Msg: msg, 186 } 187 return task, nil 188} 189 190func pullPelTask(client *redis.Client, consumerName string) (*TaskItem, error) { 191 result, _, err := client.XAutoClaim(context.Background(), &redis.XAutoClaimArgs{ 192 Stream: SimpleStreamName, 193 Group: SimpleGroupName, 194 Consumer: consumerName, // 用于认领的消费者名 195 MinIdle: taskIdleTimeout, // 5秒未确认才被领取 196 Start: "0-0", // 特殊的消息 ID,表示从 PEL 的最开始(最早的消息)开始扫描 197 Count: 1, // 每次认领最多 1 条 198 }).Result() 199 if err != nil && err != redis.Nil { 200 return nil, fmt.Errorf("Error in XAUTOCLAIM: %v \n", err) 201 } 202 if len(result) == 0 { 203 return nil, nil 204 } 205 messageData := result[0].Values 206 msg, ok := messageData["Msg"].(string) 207 if !ok { 208 return nil, errors.New("messageData["Msg"].(string) !ok") 209 } 210 return &TaskItem{ 211 MsgId: result[0].ID, 212 Msg: msg, 213 }, nil 214} 215 216func IsAllDone(client *redis.Client, ctx context.Context) bool { 217 queueSize, err := client.XLen(ctx, SimpleStreamName).Result() 218 if err != nil { 219 fmt.Println("client.XLen(ctx, streamName).Result() err = ", err) 220 return false 221 } 222 return queueSize == 0 223} 224 225func processMessage(item *TaskItem) error { 226 // 10%概率出错 227 num := rand.Intn(101) 228 if num >= 20 { 229 return nil 230 } 231 return errors.New("random process message error") 232} 233
几个重要的函数
LoopConsume 不断循环获取消息,消费消息, 优先处理 pel 中积压的消息,pel中消息处理完了再消未被领取的消息
LoopDeleteDeadConsumer 定期清理死掉的consumer , 判断依据是pending=0和idle 大于某个值
processMessage 消费消息的函数, 为了让消息进入pel被二次获取,这里手动控制了10%的概率执行失败,这样就可以被其他consumer消费
IsAllDone 查看redis stream 中是否还有未被消费的消息
测试一下
测试过程
- 执行 push_task_test.go 的 TestPushTask 函数往 redis stream中手动插入50条消息
- 执行main.go 预期会处理完全部消息
- 等待两分钟,两分钟后再次执行TestPushTask往redis stream中手动插入50条消息
- 执行main.go 预期会处理完全部消息,且会删除掉第一次执行main.go的两个consumer(idle阈值时间是1分钟)
输出结果
第一次TestPushTask
1=== RUN TestPushTask 2 push_task_test.go:37: add 50 tasks ok 3--- PASS: TestPushTask (0.03s) 4PASS 5
第一次执行main.go
1[consumer:4012193680385755545] 2[consumer:1114439635847877195] 3[consumer:4012193680385755545] processed task failed, messageid = 1770889629531-0 4[consumer:1114439635847877195] acked task ok, messageid = 1770889629531-1 5[consumer:4012193680385755545] acked task ok, messageid = 1770889629532-0 6[consumer:1114439635847877195] acked task ok, messageid = 1770889629532-1 7[consumer:4012193680385755545] acked task ok, messageid = 1770889629533-0 8[consumer:1114439635847877195] acked task ok, messageid = 1770889629533-1 9[consumer:4012193680385755545] acked task ok, messageid = 1770889629534-0 10[consumer:1114439635847877195] acked task ok, messageid = 1770889629534-1 11[consumer:4012193680385755545] acked task ok, messageid = 1770889629535-0 12[consumer:1114439635847877195] acked task ok, messageid = 1770889629536-0 13[consumer:1114439635847877195] processed task failed, messageid = 1770889629537-0 14[consumer:4012193680385755545] acked task ok, messageid = 1770889629536-1 15[consumer:4012193680385755545] processed task failed, messageid = 1770889629537-1 16[consumer:1114439635847877195] processed task failed, messageid = 1770889629538-0 17[consumer:4012193680385755545] processed task failed, messageid = 1770889629539-1 18[consumer:1114439635847877195] acked task ok, messageid = 1770889629539-0 19[consumer:4012193680385755545] acked task ok, messageid = 1770889629540-0 20[consumer:1114439635847877195] acked task ok, messageid = 1770889629540-1 21[consumer:4012193680385755545] processed task failed, messageid = 1770889629541-0 22[consumer:1114439635847877195] processed task failed, messageid = 1770889629541-1 23[consumer:4012193680385755545] acked task ok, messageid = 1770889629541-2 24[consumer:1114439635847877195] acked task ok, messageid = 1770889629542-0 25[consumer:4012193680385755545] acked task ok, messageid = 1770889629542-1 26[consumer:1114439635847877195] acked task ok, messageid = 1770889629543-0 27[consumer:4012193680385755545] acked task ok, messageid = 1770889629543-1 28[consumer:1114439635847877195] acked task ok, messageid = 1770889629543-2 29[consumer:4012193680385755545] acked task ok, messageid = 1770889629544-0 30[consumer:1114439635847877195] acked task ok, messageid = 1770889629544-1 31[consumer:4012193680385755545] processed task failed, messageid = 1770889629545-0 32[consumer:1114439635847877195] acked task ok, messageid = 1770889629545-1 33[consumer:4012193680385755545] acked task ok, messageid = 1770889629546-0 34[consumer:1114439635847877195] acked task ok, messageid = 1770889629546-1 35[consumer:4012193680385755545] acked task ok, messageid = 1770889629546-2 36[consumer:1114439635847877195] processed task failed, messageid = 1770889629547-0 37[consumer:4012193680385755545] processed task failed, messageid = 1770889629547-1 38[consumer:1114439635847877195] acked task ok, messageid = 1770889629547-2 39[consumer:4012193680385755545] acked task ok, messageid = 1770889629548-0 40[consumer:1114439635847877195] acked task ok, messageid = 1770889629548-1 41[consumer:4012193680385755545] acked task ok, messageid = 1770889629549-0 42[consumer:1114439635847877195] processed task failed, messageid = 1770889629549-1 43[consumer:4012193680385755545] processed task failed, messageid = 1770889629549-2 44[consumer:1114439635847877195] acked task ok, messageid = 1770889629550-0 45[consumer:4012193680385755545] acked task ok, messageid = 1770889629550-1 46[consumer:1114439635847877195] processed task failed, messageid = 1770889629550-2 47[consumer:4012193680385755545] acked task ok, messageid = 1770889629551-0 48[consumer:1114439635847877195] acked task ok, messageid = 1770889629551-1 49[consumer:4012193680385755545] acked task ok, messageid = 1770889629551-2 50[consumer:1114439635847877195] acked task ok, messageid = 1770889629552-0 51[consumer:4012193680385755545] processed task failed, messageid = 1770889629552-1 52[consumer:1114439635847877195] acked task ok, messageid = 1770889629552-2 53[consumer:4012193680385755545] processed pel task failed, messageid = 1770889629537-0 54[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629531-0 55[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629537-1 56[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629538-0 57[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629539-1 58[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629541-0 59[consumer:4012193680385755545] processed pel task failed, messageid = 1770889629541-1 60[consumer:1114439635847877195] processed pel task failed, messageid = 1770889629545-0 61[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629547-0 62[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629547-1 63[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629549-1 64[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629549-2 65[consumer:4012193680385755545] processed pel task failed, messageid = 1770889629550-2 66[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629552-1 67[consumer:1114439635847877195] processed pel task failed, messageid = 1770889629537-0 68[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629541-1 69[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629545-0 70[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629550-2 71[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629537-0 72all task done 73
第二次TestPushTask
1=== RUN TestPushTask 2 push_task_test.go:37: add 50 tasks ok 3--- PASS: TestPushTask (0.03s) 4PASS 5
第二次执行main.go
1[consumer:7319152151140819830] 2[consumer:2792941594605994536] 3[consumer:7319152151140819830] acked task ok, messageid = 1770889711125-0 4[consumer:2792941594605994536] acked task ok, messageid = 1770889711126-0 5[consumer:2792941594605994536] processed task failed, messageid = 1770889711127-0 6[consumer:7319152151140819830] acked task ok, messageid = 1770889711126-1 7[consumer:2792941594605994536] acked task ok, messageid = 1770889711127-1 8[consumer:7319152151140819830] acked task ok, messageid = 1770889711128-0 9[consumer:2792941594605994536] acked task ok, messageid = 1770889711129-0 10[consumer:7319152151140819830] acked task ok, messageid = 1770889711129-1 11[consumer:2792941594605994536] processed task failed, messageid = 1770889711129-2 12[consumer:7319152151140819830] acked task ok, messageid = 1770889711130-0 13[consumer:2792941594605994536] acked task ok, messageid = 1770889711130-1 14[consumer:7319152151140819830] acked task ok, messageid = 1770889711131-0 15[consumer:2792941594605994536] acked task ok, messageid = 1770889711132-0 16[consumer:7319152151140819830] acked task ok, messageid = 1770889711132-1 17[consumer:2792941594605994536] processed task failed, messageid = 1770889711133-0 18[consumer:7319152151140819830] processed task failed, messageid = 1770889711133-1 19[consumer:2792941594605994536] acked task ok, messageid = 1770889711134-0 20[consumer:7319152151140819830] acked task ok, messageid = 1770889711134-1 21[consumer:2792941594605994536] processed task failed, messageid = 1770889711135-0 22[consumer:7319152151140819830] acked task ok, messageid = 1770889711135-1 23[consumer:2792941594605994536] acked task ok, messageid = 1770889711135-2 24[consumer:2792941594605994536] acked task ok, messageid = 1770889711136-1 25[consumer:7319152151140819830] acked task ok, messageid = 1770889711136-0 26[consumer:7319152151140819830] acked task ok, messageid = 1770889711137-0 27[consumer:2792941594605994536] acked task ok, messageid = 1770889711136-2 28[consumer:7319152151140819830] processed task failed, messageid = 1770889711137-2 29[consumer:2792941594605994536] acked task ok, messageid = 1770889711137-1 30[consumer:7319152151140819830] acked task ok, messageid = 1770889711138-0 31[consumer:2792941594605994536] processed task failed, messageid = 1770889711138-1 32[consumer:7319152151140819830] acked task ok, messageid = 1770889711138-2 33[consumer:2792941594605994536] acked task ok, messageid = 1770889711139-0 34[consumer:2792941594605994536] processed task failed, messageid = 1770889711139-1 35[consumer:7319152151140819830] processed task failed, messageid = 1770889711139-2 36[consumer:7319152151140819830] acked task ok, messageid = 1770889711140-0 37[consumer:2792941594605994536] acked task ok, messageid = 1770889711139-3 38[consumer:2792941594605994536] processed task failed, messageid = 1770889711140-2 39[consumer:7319152151140819830] acked task ok, messageid = 1770889711140-1 40[consumer:2792941594605994536] acked task ok, messageid = 1770889711140-3 41[consumer:7319152151140819830] acked task ok, messageid = 1770889711141-0 42[consumer:2792941594605994536] acked task ok, messageid = 1770889711141-1 43[consumer:7319152151140819830] processed task failed, messageid = 1770889711141-2 44[consumer:2792941594605994536] acked task ok, messageid = 1770889711142-0 45[consumer:7319152151140819830] acked task ok, messageid = 1770889711141-3 46[consumer:7319152151140819830] acked task ok, messageid = 1770889711142-1 47[consumer:2792941594605994536] acked task ok, messageid = 1770889711142-2 48[consumer:7319152151140819830] processed task failed, messageid = 1770889711143-0 49[consumer:2792941594605994536] acked task ok, messageid = 1770889711142-3 50[consumer:7319152151140819830] acked task ok, messageid = 1770889711143-1 51[consumer:2792941594605994536] acked task ok, messageid = 1770889711143-2 52[consumer:7319152151140819830] acked task ok, messageid = 1770889711143-3 53[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711129-2 54[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711127-0 55[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711133-0 56[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711133-1 57[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711135-0 58[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711137-2 59[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711138-1 60[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711139-1 61[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711139-2 62[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711140-2 63[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711141-2 64[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711143-0 65[consumer:1114439635847877195] (idle: 2m16.696s, pending: 0) was deleted 66[consumer:4012193680385755545] (idle: 2m16.695s, pending: 0) was deleted 67[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711135-0 68[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711129-2 69[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711137-2 70[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711135-0 71[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711135-0 72all task done 73
可以看到全部task都被ack了且第一次的两个consumer: 4012193680385755545 和 consumer:1114439635847877195 都在第二次main.go中被删除掉了
《redis stream用作消息队列极速入门》 是转载文章,点击查看原文。
