redis stream用作消息队列极速入门

作者:ChesterZhang日期:2026/3/5

背景 最近做了几个需求都用了redis stream用作消息队列,感觉redis stream相当大轻量化,易于上手,且功能强大,为此特意实现了了一个极简但实用的 redis stream 的示例

redis stream 的三个概念 stream, consumer group , consumer

要想学会如何使用 redis stream, 最重要的就是理解 stream, consumer group , consumer 三者的关系。 简单来说:

  1. stream 为消息流, 类似于传送带负责传送商品
  2. consumer group 为获取消息流的一个团队
  3. consumer 为 consumer group 下的员工

一个 stream 下面可以包含多个 consumer group, 每个 consumer group下面可以包含多个 consumer .

我们在使用redis stream 之前需要提前使用XGroupCreateMkStream传入 stream名称 和 consumer group名称用来创建一个 stream 以及其对应的消费者组, 只有被注册的消费者组里面的消费者才能获取消息。

Pending Entries List (PEL) 与 二阶段提交

redis stream 也是支持二阶段提交与手动ack的, 这依赖于Pending Entries List (PEL) 。

使用 XReadGroup 传入 stream 名称,consumer group名称 和 consumer 名称 就可以获取到 消息, 被获取到的消息会进入 Pending Entries Lis (pel)里面, 这个pel相当于一个临时待确定的集合, 记录被消费者领取但是没有消费的消息,等消费者把消息 消费完成了还需要手动 ack 这条消息。 可以将pel类比成 git 的暂存区,需要被commit才算正式确认修改。

在pel的消息可以被同一个 consumer group的consumer获取,并且在一定时间内无法被其他consumer获取,这个时间叫做 MinIdle,MinIdle 保障了在这个时间范围内只能由一个消费者去二次消费消息。 使用 XAutoClaim 传入 stream 名称,consumer group名称, consumer 名称 和 MinIdle 即可从 pel 中领取近 MinIdle 时间范围内没有被领取的消息, 消息被领取后的 MinIdle 时间范围内无法被其他consumer领取

k8s 中 如何分配 consumer group 和 consumer ?

在实际开发中,一般一个类型的任务(比如消息通知系统)会用到一个 stream 和一个 consumer group. 虽然 stream 和 consumer group 是一对多的关系,但为了方便开发一般习惯于两者使用一对一的关系。 这点和工厂流水线同理,一条流水线最好还是专门传送一种商品。而consumer group 和 consumer的关系通常是一对多的,也就是真正需要干活的工作由多个 consumer 去完成。

如果一个k8s实例对应一个consumer(实际上也应该这么干,一台实例一个consumer, 一个consumer一次性可以领取多条消息),那么consumer name 就可以直接是当前机器的hostname, 如果一台只能同时消费多个消息,那么consumer内就可以直接是随机数。

考虑到k8s实例经常需要重启更新 consumer name 会越来越多,所以需要定期的去删除旧实例中的consuemr , 可以通过 XInfoConsumers 传入 stream name 和 group name 获取到当前consumer group 下的全部consumer.然后在通过每一个 consumer的 pending =0 和 Idle 大于某个值进行判断是否可以删除。

pending =0表示这个consumer没有被分片任何消息,idle会在每一次consumer与redis stream交互如 XReadGroup , XAutoClaim 被更新,大于某个值说明这个consumer很久没有任何动作了可以被判定为死掉

极简代码示例

目录结构

1redis_stream_demo
2  - internal 
3     internal.go 操作 redis stream的相关函数
4  - test 
5    push_task_test.go 模拟生产者往 redis stream 发消息 
6  main.go 服务端消费者循环消费任务, 定期清理死掉的consumer
7  go.mod 
8  go.sum
9
10

先来看单元测试文件: 内容很简单,模拟生产者往里面塞消息,再确认有没有塞进去

1package test
2
3import (
4    "context"
5    "github.com/redis/go-redis/v9"
6    "redis_stream_demo/internal"
7    "testing"
8)
9
10func TestPushTask(t *testing.T) {
11    ctx := context.Background()
12    client := redis.NewClient(&redis.Options{
13       Addr: "localhost:6379",
14    })
15
16    // 模拟生产者 向流中添加多条消息
17    for i := 0; i < 50; i++ {
18       _, err := client.XAdd(ctx, &redis.XAddArgs{
19          Stream: internal.SimpleStreamName,
20          Values: map[string]interface{}{
21             "Msg": i,
22          },
23       }).Result()
24       if err != nil {
25          t.Fatal(err)
26       }
27    }
28
29    // 查看是否将50条消息插入队列
30    queueSize, err := client.XLen(ctx, internal.SimpleStreamName).Result()
31    if err != nil {
32       t.Fatal(err)
33    }
34    if queueSize != 50 {
35       t.Fatal("queueSize != 50 ")
36    }
37    t.Log("add 50 tasks ok")
38}
39

再来看 main 函数 main 函数也很简单,创建了一个stream 和对应的 consumer group, 启动两个 consumer , 然后两个consumer 不断消费消息,同时有另一个go routine监控过期的 consumer 然后给予清理, 当所有消息被消费后退出程序

1package main
2
3import (
4    "context"
5    "fmt"
6    "github.com/redis/go-redis/v9"
7    "math/rand"
8    "redis_stream_demo/internal"
9    "time"
10)
11
12func main() {
13
14    ctx := context.Background()
15    client := redis.NewClient(&redis.Options{
16       Addr: "localhost:6379",
17    })
18
19    // 创建消费者组
20    err := internal.RegisterConsumerGroup(client)
21    if err != nil {
22       panic(err)
23    }
24
25    // 启动2个消费者
26    consumer1 := fmt.Sprintf("consumer:%d", rand.Int63())
27    consumer2 := fmt.Sprintf("consumer:%d", rand.Int63())
28
29    fmt.Printf("[%s]\n", consumer1)
30    fmt.Printf("[%s]\n", consumer2)
31
32    go internal.LoopConsume(client, consumer1)
33    go internal.LoopConsume(client, consumer2)
34
35    // 启动定时清除旧的consumer
36    go internal.LoopDeleteDeadConsumer(client)
37
38    for {
39       if internal.IsAllDone(client, ctx) {
40          fmt.Println("all task done")
41          break
42       }
43       time.Sleep(time.Second)
44    }
45
46
47}
48

仔细看看internal.go 内部如何操作 redis stream

1package internal
2
3import (
4    "context"
5    "errors"
6    "fmt"
7    "github.com/redis/go-redis/v9"
8    "math/rand"
9    "time"
10)
11
12const (
13    SimpleStreamName    = "simple-stream"
14    SimpleGroupName     = "simple-group"
15    taskIdleTimeout     = 5 * time.Second // 5 秒后进入 pel
16    consumerIdleTimeout = time.Minute     // consumer 超过1分钟没有活动则认为死亡
17)
18
19type TaskItem struct {
20    MsgId string
21    Msg   string
22}
23
24// 创建消费者组,如果已经创建了不重复创建
25func RegisterConsumerGroup(client *redis.Client) error {
26    groupInfo, err := client.XInfoGroups(context.Background(), SimpleStreamName).Result()
27    if err != nil && err.Error() != "ERR no such key" {
28       return err
29    }
30    for _, group := range groupInfo {
31       if group.Name == SimpleGroupName {
32          return nil
33       }
34    }
35    return client.XGroupCreateMkStream(context.Background(), SimpleStreamName, SimpleGroupName, "0").Err()
36}
37
38func LoopConsume(client *redis.Client, consumerName string) {
39
40    for {
41
42       // 优先处理pel中的任务
43       task, err := pullPelTask(client, consumerName)
44       if err != nil {
45          fmt.Println(err)
46          time.Sleep(time.Second)
47          continue
48       }
49
50       if task != nil {
51          if err := processMessage(task); err != nil {
52             // 失败则执行下一条任务
53             fmt.Printf("[%s] processed pel task failed, messageid = %s\n", consumerName, task.MsgId)
54             continue
55          }
56          // 成功则 XACK
57          err = ackSuccessTask(context.Background(), client, SimpleStreamName, SimpleGroupName, task.MsgId)
58          if err != nil {
59             // 失败则执行下一条任务
60             fmt.Printf("[%s] acked pel task failed, messageid = %s\n", consumerName, task.MsgId)
61             continue
62          }
63          fmt.Printf("[%s] acked pel task ok, messageid = %s\n", consumerName, task.MsgId)
64          continue // 优先处理 pel 中积压的消息
65       }
66
67       // 拉取 1 条消息
68       task, err = pullTask(context.Background(), client, consumerName)
69       if err != nil {
70          fmt.Println(err)
71          time.Sleep(time.Second)
72          continue
73       }
74       if task != nil {
75          if err := processMessage(task); err != nil {
76             // 失败则执行下一条任务
77             fmt.Printf("[%s] processed task failed, messageid = %s\n", consumerName, task.MsgId)
78             continue
79          }
80          // 成功则 XACK
81          err = ackSuccessTask(context.Background(), client, SimpleStreamName, SimpleGroupName, task.MsgId)
82          if err != nil {
83             // 失败则执行下一条任务
84             fmt.Printf("[%s] acked task failed, messageid = %s\n", consumerName, task.MsgId)
85             continue
86          }
87          fmt.Printf("[%s] acked task ok, messageid = %s\n", consumerName, task.MsgId)
88       } else {
89          time.Sleep(3 * time.Second) // 没有任何消息
90       }
91
92    }
93}
94
95func LoopDeleteDeadConsumer(client *redis.Client) {
96    // 每10扫一次consumer
97    ticker := time.NewTicker(10 * time.Second)
98    for {
99       select {
100       case <-ticker.C:
101          consumers, err := readConsumer(context.Background(), client)
102          if err != nil {
103             fmt.Println(err)
104             break
105          }
106          if len(consumers) == 0 {
107             fmt.Println("tg robot len(consumers) == 0 ")
108             break
109          }
110          for _, consumer := range consumers {
111             // 条件: pending == 0  idle 超过阈值
112             if consumer.Pending == 0 && consumer.Idle > consumerIdleTimeout {
113                // 执行删除
114                _, err = deleteConsumer(context.Background(), client, consumer.Name)
115                if err != nil {
116                   fmt.Println(err)
117                   continue
118                }
119                fmt.Printf("[%s] (idle: %s, pending: %d) was deleted\n",
120                   consumer.Name, consumer.Idle.String(), consumer.Pending)
121             }
122          }
123       }
124    }
125}
126
127func readConsumer(ctx context.Context, client *redis.Client) ([]redis.XInfoConsumer, error) {
128    consumers, err := client.XInfoConsumers(ctx, SimpleStreamName, SimpleGroupName).Result()
129    if err != nil {
130       return nil, err
131    }
132    return consumers, nil
133}
134
135func deleteConsumer(ctx context.Context, client *redis.Client, consumerName string) (int64, error) {
136    deleted, err := client.XGroupDelConsumer(ctx, SimpleStreamName, SimpleGroupName, consumerName).Result()
137    if err != nil {
138       return 0, err
139    }
140    return deleted, err
141}
142
143func ackSuccessTask(ctx context.Context, client *redis.Client, streamName, groupName, messageID string) error {
144    tx := client.TxPipeline()
145    // ack 任务
146    tx.XAck(ctx, streamName, groupName, messageID)
147    // 删除消息
148    tx.XDel(ctx, streamName, messageID)
149    _, err := tx.Exec(ctx)
150    if err != nil {
151       return err
152    }
153    return nil
154}
155
156func pullTask(ctx context.Context, client *redis.Client, consumer string) (*TaskItem, error) {
157    streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
158       Group:    SimpleGroupName,
159       Consumer: consumer,
160       Streams:  []string{SimpleStreamName, ">"}, // ">" 是一个特殊 ID,表示从消费者组中尚未分配给任何消费者的新消息开始读取
161       Count:    1,
162       Block:    1000 * time.Millisecond,
163    }).Result()
164    if err != nil {
165       if err == redis.Nil {
166          return nil, nil
167       }
168       return nil, err
169    }
170    if len(streams) == 0 {
171       return nil, nil
172    }
173    if len(streams[0].Messages) == 0 {
174       return nil, nil
175    }
176
177    message := streams[0].Messages[0]
178    messageData := message.Values
179    msg, ok := messageData["Msg"].(string)
180    if !ok {
181       return nil, errors.New("messageData["Msg"].(string) !ok")
182    }
183    task := &TaskItem{
184       MsgId: message.ID,
185       Msg:   msg,
186    }
187    return task, nil
188}
189
190func pullPelTask(client *redis.Client, consumerName string) (*TaskItem, error) {
191    result, _, err := client.XAutoClaim(context.Background(), &redis.XAutoClaimArgs{
192       Stream:   SimpleStreamName,
193       Group:    SimpleGroupName,
194       Consumer: consumerName,    // 用于认领的消费者名
195       MinIdle:  taskIdleTimeout, // 5秒未确认才被领取
196       Start:    "0-0",           // 特殊的消息 ID,表示从 PEL 的最开始(最早的消息)开始扫描
197       Count:    1,               // 每次认领最多 1 
198    }).Result()
199    if err != nil && err != redis.Nil {
200       return nil, fmt.Errorf("Error in XAUTOCLAIM: %v \n", err)
201    }
202    if len(result) == 0 {
203       return nil, nil
204    }
205    messageData := result[0].Values
206    msg, ok := messageData["Msg"].(string)
207    if !ok {
208       return nil, errors.New("messageData["Msg"].(string) !ok")
209    }
210    return &TaskItem{
211       MsgId: result[0].ID,
212       Msg:   msg,
213    }, nil
214}
215
216func IsAllDone(client *redis.Client, ctx context.Context) bool {
217    queueSize, err := client.XLen(ctx, SimpleStreamName).Result()
218    if err != nil {
219       fmt.Println("client.XLen(ctx, streamName).Result() err = ", err)
220       return false
221    }
222    return queueSize == 0
223}
224
225func processMessage(item *TaskItem) error {
226    // 10%概率出错
227    num := rand.Intn(101)
228    if num >= 20 {
229       return nil
230    }
231    return errors.New("random process message error")
232}
233

几个重要的函数

LoopConsume 不断循环获取消息,消费消息, 优先处理 pel 中积压的消息,pel中消息处理完了再消未被领取的消息

LoopDeleteDeadConsumer 定期清理死掉的consumer , 判断依据是pending=0和idle 大于某个值

processMessage 消费消息的函数, 为了让消息进入pel被二次获取,这里手动控制了10%的概率执行失败,这样就可以被其他consumer消费

IsAllDone 查看redis stream 中是否还有未被消费的消息

测试一下

测试过程

  1. 执行 push_task_test.go 的 TestPushTask 函数往 redis stream中手动插入50条消息
  2. 执行main.go 预期会处理完全部消息
  3. 等待两分钟,两分钟后再次执行TestPushTask往redis stream中手动插入50条消息
  4. 执行main.go 预期会处理完全部消息,且会删除掉第一次执行main.go的两个consumer(idle阈值时间是1分钟)

输出结果

第一次TestPushTask

1=== RUN   TestPushTask
2    push_task_test.go:37: add 50 tasks ok
3--- PASS: TestPushTask (0.03s)
4PASS
5

第一次执行main.go

1[consumer:4012193680385755545]
2[consumer:1114439635847877195]
3[consumer:4012193680385755545] processed task failed, messageid = 1770889629531-0
4[consumer:1114439635847877195] acked task ok, messageid = 1770889629531-1
5[consumer:4012193680385755545] acked task ok, messageid = 1770889629532-0
6[consumer:1114439635847877195] acked task ok, messageid = 1770889629532-1
7[consumer:4012193680385755545] acked task ok, messageid = 1770889629533-0
8[consumer:1114439635847877195] acked task ok, messageid = 1770889629533-1
9[consumer:4012193680385755545] acked task ok, messageid = 1770889629534-0
10[consumer:1114439635847877195] acked task ok, messageid = 1770889629534-1
11[consumer:4012193680385755545] acked task ok, messageid = 1770889629535-0
12[consumer:1114439635847877195] acked task ok, messageid = 1770889629536-0
13[consumer:1114439635847877195] processed task failed, messageid = 1770889629537-0
14[consumer:4012193680385755545] acked task ok, messageid = 1770889629536-1
15[consumer:4012193680385755545] processed task failed, messageid = 1770889629537-1
16[consumer:1114439635847877195] processed task failed, messageid = 1770889629538-0
17[consumer:4012193680385755545] processed task failed, messageid = 1770889629539-1
18[consumer:1114439635847877195] acked task ok, messageid = 1770889629539-0
19[consumer:4012193680385755545] acked task ok, messageid = 1770889629540-0
20[consumer:1114439635847877195] acked task ok, messageid = 1770889629540-1
21[consumer:4012193680385755545] processed task failed, messageid = 1770889629541-0
22[consumer:1114439635847877195] processed task failed, messageid = 1770889629541-1
23[consumer:4012193680385755545] acked task ok, messageid = 1770889629541-2
24[consumer:1114439635847877195] acked task ok, messageid = 1770889629542-0
25[consumer:4012193680385755545] acked task ok, messageid = 1770889629542-1
26[consumer:1114439635847877195] acked task ok, messageid = 1770889629543-0
27[consumer:4012193680385755545] acked task ok, messageid = 1770889629543-1
28[consumer:1114439635847877195] acked task ok, messageid = 1770889629543-2
29[consumer:4012193680385755545] acked task ok, messageid = 1770889629544-0
30[consumer:1114439635847877195] acked task ok, messageid = 1770889629544-1
31[consumer:4012193680385755545] processed task failed, messageid = 1770889629545-0
32[consumer:1114439635847877195] acked task ok, messageid = 1770889629545-1
33[consumer:4012193680385755545] acked task ok, messageid = 1770889629546-0
34[consumer:1114439635847877195] acked task ok, messageid = 1770889629546-1
35[consumer:4012193680385755545] acked task ok, messageid = 1770889629546-2
36[consumer:1114439635847877195] processed task failed, messageid = 1770889629547-0
37[consumer:4012193680385755545] processed task failed, messageid = 1770889629547-1
38[consumer:1114439635847877195] acked task ok, messageid = 1770889629547-2
39[consumer:4012193680385755545] acked task ok, messageid = 1770889629548-0
40[consumer:1114439635847877195] acked task ok, messageid = 1770889629548-1
41[consumer:4012193680385755545] acked task ok, messageid = 1770889629549-0
42[consumer:1114439635847877195] processed task failed, messageid = 1770889629549-1
43[consumer:4012193680385755545] processed task failed, messageid = 1770889629549-2
44[consumer:1114439635847877195] acked task ok, messageid = 1770889629550-0
45[consumer:4012193680385755545] acked task ok, messageid = 1770889629550-1
46[consumer:1114439635847877195] processed task failed, messageid = 1770889629550-2
47[consumer:4012193680385755545] acked task ok, messageid = 1770889629551-0
48[consumer:1114439635847877195] acked task ok, messageid = 1770889629551-1
49[consumer:4012193680385755545] acked task ok, messageid = 1770889629551-2
50[consumer:1114439635847877195] acked task ok, messageid = 1770889629552-0
51[consumer:4012193680385755545] processed task failed, messageid = 1770889629552-1
52[consumer:1114439635847877195] acked task ok, messageid = 1770889629552-2
53[consumer:4012193680385755545] processed pel task failed, messageid = 1770889629537-0
54[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629531-0
55[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629537-1
56[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629538-0
57[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629539-1
58[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629541-0
59[consumer:4012193680385755545] processed pel task failed, messageid = 1770889629541-1
60[consumer:1114439635847877195] processed pel task failed, messageid = 1770889629545-0
61[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629547-0
62[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629547-1
63[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629549-1
64[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629549-2
65[consumer:4012193680385755545] processed pel task failed, messageid = 1770889629550-2
66[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629552-1
67[consumer:1114439635847877195] processed pel task failed, messageid = 1770889629537-0
68[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629541-1
69[consumer:1114439635847877195] acked pel task ok, messageid = 1770889629545-0
70[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629550-2
71[consumer:4012193680385755545] acked pel task ok, messageid = 1770889629537-0
72all task done
73

第二次TestPushTask

1=== RUN   TestPushTask
2    push_task_test.go:37: add 50 tasks ok
3--- PASS: TestPushTask (0.03s)
4PASS
5

第二次执行main.go

1[consumer:7319152151140819830]
2[consumer:2792941594605994536]
3[consumer:7319152151140819830] acked task ok, messageid = 1770889711125-0
4[consumer:2792941594605994536] acked task ok, messageid = 1770889711126-0
5[consumer:2792941594605994536] processed task failed, messageid = 1770889711127-0
6[consumer:7319152151140819830] acked task ok, messageid = 1770889711126-1
7[consumer:2792941594605994536] acked task ok, messageid = 1770889711127-1
8[consumer:7319152151140819830] acked task ok, messageid = 1770889711128-0
9[consumer:2792941594605994536] acked task ok, messageid = 1770889711129-0
10[consumer:7319152151140819830] acked task ok, messageid = 1770889711129-1
11[consumer:2792941594605994536] processed task failed, messageid = 1770889711129-2
12[consumer:7319152151140819830] acked task ok, messageid = 1770889711130-0
13[consumer:2792941594605994536] acked task ok, messageid = 1770889711130-1
14[consumer:7319152151140819830] acked task ok, messageid = 1770889711131-0
15[consumer:2792941594605994536] acked task ok, messageid = 1770889711132-0
16[consumer:7319152151140819830] acked task ok, messageid = 1770889711132-1
17[consumer:2792941594605994536] processed task failed, messageid = 1770889711133-0
18[consumer:7319152151140819830] processed task failed, messageid = 1770889711133-1
19[consumer:2792941594605994536] acked task ok, messageid = 1770889711134-0
20[consumer:7319152151140819830] acked task ok, messageid = 1770889711134-1
21[consumer:2792941594605994536] processed task failed, messageid = 1770889711135-0
22[consumer:7319152151140819830] acked task ok, messageid = 1770889711135-1
23[consumer:2792941594605994536] acked task ok, messageid = 1770889711135-2
24[consumer:2792941594605994536] acked task ok, messageid = 1770889711136-1
25[consumer:7319152151140819830] acked task ok, messageid = 1770889711136-0
26[consumer:7319152151140819830] acked task ok, messageid = 1770889711137-0
27[consumer:2792941594605994536] acked task ok, messageid = 1770889711136-2
28[consumer:7319152151140819830] processed task failed, messageid = 1770889711137-2
29[consumer:2792941594605994536] acked task ok, messageid = 1770889711137-1
30[consumer:7319152151140819830] acked task ok, messageid = 1770889711138-0
31[consumer:2792941594605994536] processed task failed, messageid = 1770889711138-1
32[consumer:7319152151140819830] acked task ok, messageid = 1770889711138-2
33[consumer:2792941594605994536] acked task ok, messageid = 1770889711139-0
34[consumer:2792941594605994536] processed task failed, messageid = 1770889711139-1
35[consumer:7319152151140819830] processed task failed, messageid = 1770889711139-2
36[consumer:7319152151140819830] acked task ok, messageid = 1770889711140-0
37[consumer:2792941594605994536] acked task ok, messageid = 1770889711139-3
38[consumer:2792941594605994536] processed task failed, messageid = 1770889711140-2
39[consumer:7319152151140819830] acked task ok, messageid = 1770889711140-1
40[consumer:2792941594605994536] acked task ok, messageid = 1770889711140-3
41[consumer:7319152151140819830] acked task ok, messageid = 1770889711141-0
42[consumer:2792941594605994536] acked task ok, messageid = 1770889711141-1
43[consumer:7319152151140819830] processed task failed, messageid = 1770889711141-2
44[consumer:2792941594605994536] acked task ok, messageid = 1770889711142-0
45[consumer:7319152151140819830] acked task ok, messageid = 1770889711141-3
46[consumer:7319152151140819830] acked task ok, messageid = 1770889711142-1
47[consumer:2792941594605994536] acked task ok, messageid = 1770889711142-2
48[consumer:7319152151140819830] processed task failed, messageid = 1770889711143-0
49[consumer:2792941594605994536] acked task ok, messageid = 1770889711142-3
50[consumer:7319152151140819830] acked task ok, messageid = 1770889711143-1
51[consumer:2792941594605994536] acked task ok, messageid = 1770889711143-2
52[consumer:7319152151140819830] acked task ok, messageid = 1770889711143-3
53[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711129-2
54[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711127-0
55[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711133-0
56[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711133-1
57[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711135-0
58[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711137-2
59[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711138-1
60[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711139-1
61[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711139-2
62[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711140-2
63[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711141-2
64[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711143-0
65[consumer:1114439635847877195] (idle: 2m16.696s, pending: 0) was deleted
66[consumer:4012193680385755545] (idle: 2m16.695s, pending: 0) was deleted
67[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711135-0
68[consumer:7319152151140819830] acked pel task ok, messageid = 1770889711129-2
69[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711137-2
70[consumer:2792941594605994536] processed pel task failed, messageid = 1770889711135-0
71[consumer:2792941594605994536] acked pel task ok, messageid = 1770889711135-0
72all task done
73

可以看到全部task都被ack了且第一次的两个consumer: 4012193680385755545 和 consumer:1114439635847877195 都在第二次main.go中被删除掉了


redis stream用作消息队列极速入门》 是转载文章,点击查看原文


相关推荐


React Native 开发环境准备
zh_xuan2026/2/24

一、环境准备 我的环境: 二、建立独立RN工程 1、初始化创建工程 npx react-native init RNApp --version 0.73.4 --skip-install 这个命令提示: ��️ The `init` command is deprecated. E:\android\projects\RNDemo4>cd RNApp - Switch to npx @react-native-community/cli init f


【C++】模拟实现 红黑树(RBTree)
yuuki2332332026/2/16

前言: 在掌握 AVL 树的严格平衡机制后,我们发现其虽能将树高严格控制在 O(logN),但「高度差≤1」的强约束也带来了明显代价:插入 / 删除操作中频繁的旋转(最多两次双旋)大幅增加了写操作的开销,且每个节点需额外存储平衡因子和父指针,空间利用率较低。 为解决这一问题,红黑树(Red-Black Tree)作为一种近似平衡的二叉搜索树应运而生 —— 它放弃了 AVL 树 “严格平衡” 的要求,转而通过「节点颜色标记 + 5 条核心规则」实现 “黑高一致” 的弱平衡,将任意根到叶子的路径


Git常用操作指令
stu_kk2026/2/7

最近给公司小伙伴安排了一下git培训,写了个常用指令,记录一下 一、配置与初始化(首次使用/新建仓库) 指令 功能说明 git config --global user.name "你的姓名" 配置全局用户名(会显示在提交记录中) git config --global user.email "你的公司邮箱" 配置全局用户邮箱 `git config --list 查看配置


Prometheus+Grafana构建云原生分布式监控系统(十)_prometheus的服务发现机制(一)
牛奶咖啡132026/1/29

Prometheus+Grafana构建云原生分布式监控系统(九)_pushgateway的使用https://blog.csdn.net/xiaochenXIHUA/article/details/157392956 一、prometheus的服务发现机制  1.1、prometheus的服务发现机制概述         prometheus是基于拉(pull)模式抓取监控数据,首先要能够发现需要监控的目标对象target,那么prometheus如何获监控目标呢?有两种方式【静态手动配


Polyfill方式解决前端兼容性问题:core-js包结构与各种配置策略
漂流瓶jz2026/1/20

简介 在之前我介绍过Babel:解锁Babel核心功能:从转义语法到插件开发,Babel是一个使用AST转义JavaScript语法,提高代码在浏览器兼容性的工具。但有些ECMAScript并不是新的语法,而是一些新对象,新方法等等,这些并不能使用AST抽象语法树来转义。因此Babel利用core-js实现这些代码的兼容性。 core-js是一个知名的前端工具库,里面包含了ECMAScript标准中提供的新对象/新方法等,而且是使用旧版本支持的语法来实现这些新的API。这样即使浏览器没有实现标准


一文搞懂机器学习中的特征降维!
aicoting2026/1/12

推荐直接网站在线阅读:aicoting AI算法面试学习在线网站 特征工程(Feature Engineering) 是机器学习流程中将原始数据转换为适合模型学习的特征的关键步骤。它直接决定了模型能否高效捕捉数据中的规律。好的特征可以显著提升模型性能,而差的特征即使模型再复杂也难以取得好效果。 特征工程的核心目标是: 提取有效信息:将原始数据中有价值的信号转化为模型可以理解的特征; 减少冗余与噪声:去掉无关或多余的特征,使模型更简洁、更泛化; 增强表达能力:通过构造、组合或降维生成新的特征,


Day 12:Git配置详解:用户信息、编辑器、颜色等配置
CNRio2026/1/4

“你有没有遇到过这样的尴尬:提交代码时,Git显示’Author: Unknown’,然后你发现是自己写的代码,却不知道是谁提交的?别担心,这就像你写了一封信,却没写署名一样!” 🌟 为什么说Git配置是"代码身份证"? 想象一下,你正在写一本小说,每章都署名"匿名作者"。读者会怎么想?他们可能会怀疑这本书是不是真的由你写的。Git配置就是你的"代码身份证",它告诉世界"这代码是我写的"。 正如《Pro Git》中所说: “Git的配置系统是分层的,有三个层次:系统级、全局级和本地级。系统


LeetCode 热题100 --- 双指针专区
谎言西西里2025/12/26

283. 移动零 - 力扣(LeetCode) 题目分析: 题目要求将数组 nums 中所有 0 移动至数组末尾,同时保持其他非零元素的相对顺序不变,并且要求在原数组上进行操作。 核心要求: 0 要移动至数组末尾 非零元素相对位置不变 在原数组上进行操作 解法一(暴力使用数组方法) 遍历数组将其中所有为 0 的数直接使用splice删除并且记录 0 的个数,最后通过push填入“移动”的 0 var moveZeroes = function(nums) { let n = 0;


【大前端】【Android】 Android 手机上导出已安装 App 的 APK
柯南二号2025/12/17

根据是否有 root / adb / 仅手机操作,常见有 4 种靠谱方式。按「实用度 + 成本」整理👇 一、最推荐:ADB 导出(无需 Root,最稳定)⭐️ 适合开发者、抓包、逆向、分析三方 APK 1️⃣ 开启 USB 调试 设置 → 关于手机 → 连续点击“版本号” → 开发者模式 开发者选项 → USB 调试 2️⃣ 找到 APK 路径 adb shell pm list packages | grep wechat 例如: package:com.tence


Agent 入门科普:从"人工智障"到"数字打工人"的进化史
无限大62025/12/9

🤖 Agent 入门科普:从"人工智障"到"数字打工人"的进化史 大家好,欢迎来到无限大的博客,这个专栏是新开的,打算讲一讲Agent,其实早就有学习的打算了 近期在逛github的时候看到一个高star项目,叫做Hello-Agents,项目地址是[github.com/datawhalech…] 我的文章也是参考了这个内容写的,这个系列更新比较慢,因为我也是边学边写的,所以会比较慢,但是我会尽量写的详细一些,用更多贴近生活的抽象案例来讲解,希望能帮助到大家 引言:当 AI 开始自己"打

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 XYZ博客