目录
前言
go-redis的github:https://github.com/go-redis/redis/
安装go-redis
1 |
go get -u github.com/go-redis/redis/v8 |
go-redis支持
- 自动连接池与断路器支持
- 发布/订阅
- 事务
- 管道和TxPipeline
- 脚本
- 超时
- 哨兵模式
- 集群模式
- Ring
等特性。
创建go-redis
普通的redis客户端(redis.NewClient()
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
r.client = redis.NewClient(&redis.Options{ //连接信息 //网络类型,tcp or unix,默认tcp Network: global.Config.GetString("redis.networkType"), //主机名+冒号+端口,默认localhost:6379 Addr: fmt.Sprintf("%s:%d",global.Config.GetString("redis.host"),global.Config.GetInt("redis.port")), Password: global.Config.GetString("redis.password"), //密码 DB: global.Config.GetInt("redis.db"), // redis数据库index //连接池容量及闲置连接数量 PoolSize: global.Config.GetInt("redis.pool.max"), // 连接池最大socket连接数,默认为4倍CPU数, 4 * runtime.NumCPU MinIdleConns: global.Config.GetInt("redis.pool.min"), //在启动阶段创建指定数量的Idle连接,并长期维持idle状态的连接数不少于指定数量;。 //超时 DialTimeout: 5 * time.Second, //连接建立超时时间,默认5秒。 ReadTimeout: 3 * time.Second, //读超时,默认3秒, -1表示取消读超时 WriteTimeout: 3 * time.Second, //写超时,默认等于读超时 PoolTimeout: 4 * time.Second, //当所有连接都处在繁忙状态时,客户端等待可用连接的最大等待时长,默认为读超时+1秒。 //闲置连接检查包括IdleTimeout,MaxConnAge IdleCheckFrequency: 60 * time.Second, //闲置连接检查的周期,默认为1分钟,-1表示不做周期性检查,只在客户端获取连接时对闲置连接进行处理。 IdleTimeout: 5 * time.Minute, //闲置超时,默认5分钟,-1表示取消闲置超时检查 MaxConnAge: 0 * time.Second, //连接存活时长,从创建开始计时,超过指定时长则关闭连接,默认为0,即不关闭存活时长较长的连接 //命令执行失败时的重试策略 MaxRetries: 0, // 命令执行失败时,最多重试多少次,默认为0即不重试 MinRetryBackoff: 8 * time.Millisecond, //每次计算重试间隔时间的下限,默认8毫秒,-1表示取消间隔 MaxRetryBackoff: 512 * time.Millisecond, //每次计算重试间隔时间的上限,默认512毫秒,-1表示取消间隔 //可自定义连接函数 /*Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { netDialer := &net.Dialer{ Timeout: 5 * time.Second, KeepAlive: 5 * time.Minute, } return netDialer.Dial("tcp", "127.0.0.1:6379") },*/ //钩子函数 //仅当客户端执行命令时需要从连接池获取连接时,如果连接池需要新建连接时则会调用此钩子函数 /*OnConnect: func(ctx context.Context, cn *redis.Conn) error { fmt.Printf("conn=%v\n", cn) return nil },*/ }) // 关闭 redis // defer r.client.Close() |
集群模式(redis.NewClusterClient()
)
1 2 3 |
redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}, }) |
哨兵模式(redis.NewFailoverClient()
)
1 2 3 4 |
redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: "master", SentinelAddrs: []string{"x.x.x.x:26379", "xx.xx.xx.xx:26379", "xxx.xxx.xxx.xxx:26379"}, }) |
使用示例
set/get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
func redisExample() { // Set err := rdb.Set("score", 100, 0).Err() if err != nil { fmt.Printf("set score failed, err:%v\n", err) return } // Get val, err := rdb.Get("score").Result() if err != nil { fmt.Printf("get score failed, err:%v\n", err) return } fmt.Println("score", val) val2, err := rdb.Get("name").Result() if err == redis.Nil { fmt.Println("name does not exist") } else if err != nil { fmt.Printf("get name failed, err:%v\n", err) return } else { fmt.Println("name", val2) } } |
zset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
func redisExample2() { zsetKey := "language_rank" languages := []redis.Z{ redis.Z{Score: 90.0, Member: "Golang"}, redis.Z{Score: 98.0, Member: "Java"}, redis.Z{Score: 95.0, Member: "Python"}, redis.Z{Score: 97.0, Member: "JavaScript"}, redis.Z{Score: 99.0, Member: "C/C++"}, } // ZADD num, err := rdb.ZAdd(zsetKey, languages...).Result() if err != nil { fmt.Printf("zadd failed, err:%v\n", err) return } fmt.Printf("zadd %d succ.\n", num) // 把Golang的分数加10 newScore, err := rdb.ZIncrBy(zsetKey, 10.0, "Golang").Result() if err != nil { fmt.Printf("zincrby failed, err:%v\n", err) return } fmt.Printf("Golang's score is %f now.\n", newScore) // 取分数最高的3个 ret, err := rdb.ZRevRangeWithScores(zsetKey, 0, 2).Result() if err != nil { fmt.Printf("zrevrange failed, err:%v\n", err) return } for _, z := range ret { fmt.Println(z.Member, z.Score) } // 取95~100分的 op := redis.ZRangeBy{ Min: "95", Max: "100", } ret, err = rdb.ZRangeByScoreWithScores(zsetKey, op).Result() if err != nil { fmt.Printf("zrangebyscore failed, err:%v\n", err) return } for _, z := range ret { fmt.Println(z.Member, z.Score) } } |
Pipeline
Pipeline
主要是一种网络优化。它本质上意味着客户端缓冲一堆命令并一次性将它们发送到服务器。这些命令不能保证在事务中执行。这样做的好处是节省了每个命令的网络往返时间(RTT)。
Pipeline
基本示例如下:
1 2 3 4 5 6 7 |
pipe := rdb.Pipeline() incr := pipe.Incr("pipeline_counter") pipe.Expire("pipeline_counter", time.Hour) _, err := pipe.Exec() fmt.Println(incr.Val(), err) |
上面的代码相当于将以下两个命令一次发给redis server端执行,与不使用Pipeline
相比能减少一次RTT。
1 2 |
INCR pipeline_counter EXPIRE pipeline_counts 3600 |
也可以使用Pipelined
:
1 2 3 4 5 6 7 |
var incr *redis.IntCmd _, err := rdb.Pipelined(func(pipe redis.Pipeliner) error { incr = pipe.Incr("pipelined_counter") pipe.Expire("pipelined_counter", time.Hour) return nil }) fmt.Println(incr.Val(), err) |
在某些场景下,当我们有多条命令要执行时,就可以考虑使用pipeline
来优化。
事务
Redis是单线程的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,Multi/exec
能够确保在multi/exec
两个语句之间的命令之间没有其他客户端正在执行命令。
在这种场景我们需要使用TxPipeline
。TxPipeline
总体上类似于上面的Pipeline
,但是它内部会使用MULTI/EXEC
包裹排队的命令。例如:
1 2 3 4 5 6 7 |
pipe := rdb.TxPipeline() incr := pipe.Incr("tx_pipeline_counter") pipe.Expire("tx_pipeline_counter", time.Hour) _, err := pipe.Exec() fmt.Println(incr.Val(), err) |
上面代码相当于在一个RTT下执行了下面的redis命令:
1 2 3 4 |
MULTI INCR pipeline_counter EXPIRE pipeline_counts 3600 EXEC |
还有一个与上文类似的TxPipelined
方法,使用方法如下:
1 2 3 4 5 6 7 |
var incr *redis.IntCmd _, err := rdb.TxPipelined(func(pipe redis.Pipeliner) error { incr = pipe.Incr("tx_pipelined_counter") pipe.Expire("tx_pipelined_counter", time.Hour) return nil }) fmt.Println(incr.Val(), err) |
Watch
在某些场景下,我们除了要使用MULTI/EXEC
命令外,还需要配合使用WATCH
命令。在用户使用WATCH
命令监视某个键之后,直到该用户执行EXEC
命令的这段时间里,如果有其他用户抢先对被监视的键进行了替换、更新、删除等操作,那么当用户尝试执行EXEC
的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。
1 |
Watch(fn func(*Tx) error, keys ...string) error |
Watch方法接收一个函数和一个或多个key作为参数。基本使用示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// 监视watch_count的值,并在值不变的前提下将其值+1 key := "watch_count" err = client.Watch(func(tx *redis.Tx) error { n, err := tx.Get(key).Int() if err != nil && err != redis.Nil { return err } _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { pipe.Set(key, n+1, 0) return nil }) return err }, key) |
最后看一个官方文档中使用GET和SET命令以事务方式递增Key的值的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
const routineCount = 100 increment := func(key string) error { txf := func(tx *redis.Tx) error { // 获得当前值或零值 n, err := tx.Get(key).Int() if err != nil && err != redis.Nil { return err } // 实际操作(乐观锁定中的本地操作) n++ // 仅在监视的Key保持不变的情况下运行 _, err = tx.Pipelined(func(pipe redis.Pipeliner) error { // pipe 处理错误情况 pipe.Set(key, n, 0) return nil }) return err } for retries := routineCount; retries > 0; retries-- { err := rdb.Watch(txf, key) if err != redis.TxFailedErr { return err } // 乐观锁丢失 } return errors.New("increment reached maximum number of retries") } var wg sync.WaitGroup wg.Add(routineCount) for i := 0; i < routineCount; i++ { go func() { defer wg.Done() if err := increment("counter3"); err != nil { fmt.Println("increment error:", err) } }() } wg.Wait() n, err := rdb.Get("counter3").Int() fmt.Println("ended with", n, err) |
使用原生redis命令
1 2 3 4 5 6 7 8 9 |
func RedisCmd(c *gin.Context) { // 使用原生的 redis 命令 result,err := global.Redis.Do(c,"set","name","jack").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } fmt.Printf("执行结果:%v \n",result) } |
BloomFilter 布隆过滤器
redis 需要安装并开启
redisbloom
模块,否则,执行不了BF命令。
布隆过滤器的概念
布隆过滤器(Bloom Filter) 是由 Howard Bloom在1970年提出的二进制向量数据结构
,它具有很好的空间和时间效率
,被用来检测一个元素是不是集合中的一个成员
,即判定 “可能已存在和绝对不存在”
两种情况。如果检测结果为是,该元素不一定在集合中;但如果检测结果为否,该元素一定不在集合中,因此Bloom filter具有100%的召回率
。
布隆过滤器应用场景
- 垃圾邮件过滤
- 防止缓存击穿
- 比特币交易查询
- 爬虫的URL过滤
- IP黑名单
- 查询加速【比如基于KV结构的数据】
- 集合元素重复的判断
布隆过滤器的优缺点
1、优点:
- 有很好的
空间和时间效率
存储空间和插入/查询时间都是常数
。- Hash函数相互之间没有关系,方便由硬件并行实现。
- 不需要存储元素本身,在某些对保密要求非常严格的场合有优势。
- 布隆过滤器可以表示
全集
,其它任何数据结构都不能。
2、缺点:
误判率会随元素的增加而增加
不能从布隆过滤器中删除元素
布隆过滤器注意事项
布隆过滤器思路比较简单,但是对于布隆过滤器的随机映射函数设计,需要计算几次,向量长度设置为多少比较合适,这个才是需要认真讨论的。
如果向量长度太短,会导致误判率直线上升。
如果向量太长,会浪费大量内存。
如果计算次数过多,会占用计算资源,且很容易很快就把过滤器填满。
常用命令一览
添加单个元素
1 |
BF.ADD newFilter foo |
添加 并检查多个元素
1 2 |
//添加已存在的元素返回0 BF.MADD myFilter foo bar baz |
检查 过滤器中是否存在该元素
1 |
BF.EXISTS newFilter foo |
批量检查 过滤器中是否存在该元素
1 |
BF.MEXISTS newFilter foo bar bbb |
设置过滤器的错误率和储存量
1 |
BF.RESERVE {key} {error_rate} {capacity} [EXPANSION expansion] |
下面简单介绍一下每个参数的具体含义:
key
:布隆过滤器的名称。
error_rate
: 误报的期望概率。这应该是介于0到1之间的十进制值。例如,对于期望的误报率0.1%(1000中为1),error_rate应该设置为0.001。该数字越接近零,则每个项目的内存消耗越大,并且每个操作的CPU使用率越高。
capacity
:过滤器的容量。当实际存储的元素个数超过这个值之后,性能将开始下降。实际的降级将取决于超出限制的程度。随着过滤器元素数量呈指数增长,性能将线性下降。
可选参数:
expansion
:如果创建了一个新的子过滤器,则其大小将是当前过滤器的大小乘以expansion。默认扩展值为2。这意味着每个后续子过滤器将是前一个子过滤器的两倍。
go-redis执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 布隆过滤器 func BloomFilter(c *gin.Context) { // 添加 _,err := redis.Client.Do(c,"BF.ADD","newFilter","foo").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } // 判断是否存在 exist,err := redis.Client.Do(c,"BF.EXISTS","newFilter","foo").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } fmt.Printf("是否存在:%v \n",exist) } |
HyperLogLong
什么是HyperLogLog
Redis HyperLogLog 算法是用于基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。
在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。
但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
什么是基数
比如数据集 {1, 3, 5, 7, 5, 7, 8}
, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}
, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数。
常用命令
pfadd key element [element...]
:将所有元素参数添加到 HyperLogLog 数据结构中,如果至少有个元素被添加返回 1, 否则返回 0。
pfcount key [key...]
: 返回给定 HyperLogLog 的基数估算值。返回给定 HyperLogLog 的基数值,如果多个 HyperLogLog 则返回基数估值之和。
pfmerge destkey sourcekey [sourcekey...]
: 将多个 HyperLogLog 合并为一个 HyperLogLog ,合并后的 HyperLogLog 的基数估算值是通过对所有 给定 HyperLogLog 进行并集计算得出的。
使用场景
使用场景:统计网页访问量。
思考:怎么样统计网页访问量,并且一个IP一天访问多次同一个页面,只能算一次?
分析:1.首先分析该统计数,是否需要正确,其实产品只需要一个大概的,一天100W,和一天110W,其实差不多。如果使用Java的话,那个list可以去重,同时在内存等相关上要占很小的比率。
使用命令模式
1 2 3 4 5 6 |
// 添加 pfadd test:aaron:ip "191.168.1.23" pfadd test:aaron:ip "191.168.1.24" // 统计 pfcount test:aaron:ip |
go-redis使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// HyperLogLog 基数统计 func HyperLogLog (c *gin.Context) { // 添加 redis.Client.Do(c,"PFADD","test:aaron:ip","191.168.1.23") redis.Client.Do(c,"PFADD","test:aaron:ip","191.168.1.24") // 统计 count,err := redis.Client.Do(c,"PFCOUNT","test:aaron:ip").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } fmt.Printf("统计结果:%v \n",count) } |
GEO
GEO功能在Redis3.2版本提供,支持存储地理位置信息用来实现诸如附近位置、摇一摇这类依赖于地理位置信息的功能。
geo的实现是zset,所以可以用zset的命令去操作。
1 2 3 4 5 6 |
//遍历 zrange geo:city 0 -1 //删除 zrem geo:city suzhou //并集 ZUNIONSTORE geo:all 2 geo:city geo:user |
常用指令
geoadd key longitude(经度) latitude(纬度) member(坐标名称)
:添加地理位置坐标
1 2 |
geoadd geo:city 118.8921 31.32751 nanjing geoadd geo:city 120.21201 30.2084 hangzhou |
geopos key member [member...]
: 获取地理位置的坐标
1 |
geopos geo:city nanjing |
geodist key member1 member2 [unit]
:返回两个给定位置之间的距离
1 |
geodist geo:city nanjing hangzhou km |
如果两个位置之间的其中一个不存在,那么命令返回空值。
指定单位的参数unit必须是以下单位的其中一个:
m
表示单位为米km
表示单位为千米mi
表示单位为英里ft
表示单位为英尺
如果用户没有显式地指定单位参数,那么geodist默认使用米作为单位。
geodist命令在计算距离时会假设地球为完美的球形,在极限情况下,这一假设最大会造成0.5%的误差。
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist][withhash][asc|desc][count count]
:以给定的经纬度为中心,返回键包含的位置元素当中,与中心的距离不超过过给定最大距离的所有位置元素。
1 2 3 4 |
georadius geo:city 120 30 100 km withcoord 1) 1) "hangzhou" 2) 1) "120.21200805902481079" 2) "30.20839995425554747" |
以给定的经纬度为中心,返回键包含的位置元素当中,与中心的距离不超过给定最大距离的所有位置元素。
范围可以使用以下其中一个单位:
m
表示单位为米。km
表示单位为千米。mi
表示单位为英里。ft
表示单位为英尺。
在给定以下选项时,命令会返回额外的信息:
withdist
:在返回位置元素的同时,将位置元素与中心之间的距离也一并返回.距离的单位和用户给定的范围单位保持一致。
withcoord
:将位置元素的经度和纬度也一并返回。
withhash
:以52位有符号整数的形式,返回位置元素经过原始geohash编码的有序集合分值。这个选项主要用于底层应用或者调试,实际中的作用不大。
命令默认返回未排序的位置元素。通过以下两个参数,用户可以指定被返回位置元素的排序方式:
asc
:根据中心的位置,按照从近到远的方式返回位置元素
desc
:根据中心的位置,按照从远到近的方式返回位置元素。
在默认情况下,georadius命令会返回所有匹配的位置元素。虽然用户可以使用count选项去获取N个匹配元素,但是因为命令在内部可能会需要对所有被匹配的元素进行处理,所以在对一个非常大的区域进行搜索时,即使只使用count选项去获取少量元素,命令的执行速度也可能非常慢。但从另一方面说,使用count选项去减少需要返回的元素数量,对于减少带宽来说仍然是非常有用的。
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist][withhash][asc|desc][count count]
:同georadius,指定中心为成员,必定会显示一条member本身的信息。count N
:会显示距离最近的N个地址。
1 2 3 4 5 6 7 8 9 |
georadiusbymember geo:city nanjing 200 km withcoord withdist count 2 1) 1) "hangzhou" 2) "177.2150" 3) 1) "120.21200805902481079" 2) "30.20839995425554747" 2) 1) "nanjing" 2) "0.0000" 3) 1) "118.89209836721420288" 2) "31.32750976275760735" |
geohash key member [member...]
:Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似表示距离越近。
1 2 |
geohash geo:city nanjing 1) "wtsd1qyxfx0" |
go-redis使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// geo func Geo (c *gin.Context) { // 添加 redis.Client.Do(c,"geoadd","geo:city","118.8921","31.32751","nanjing") redis.Client.Do(c,"geoadd","geo:city","120.21201","30.2084","hangzhou") // 获取坐标 geopos,err := redis.Client.Do(c,"geopos","geo:city","nanjing").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } fmt.Printf("坐标为:%v \n",geopos) // 返回两个给定位置之间的距离 geodist,err := redis.Client.Do(c,"geodist","geo:city","nanjing","hangzhou","km").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } fmt.Printf("两地距离:%v \n",geodist) // 某点的半径内 georadius,err := redis.Client.Do(c,"georadius","geo:city","120","30","100","km","withcoord").Result() if err != nil { fmt.Printf("redis命令执行错误:%s \n",err.Error()) return } fmt.Printf("半径内:%v \n",georadius) } |
参考
Go语言操作Redis
go-redis使用之Hash字典
golang-redis教程