最近一段时间,我们在工作中用到了这个消息中间件,而其他小组的同事们也都作出了最后的决定。当然,我自己编写的性能测试框架也必须要访问这个消息系统。我也在努力的学习。
Redis流是最新版本5.0中的一个新的数据结构。Redis流主要在消息队列(MQ,消息 Queue)中使用, Redis发行订阅(pub/sub)来完成消息队列,但其缺点是消息不能持久,一旦网络中断、 Redis停机,这些信息都会被删除。
以前他都不知道 Redis的用法,现在看来,他已经很久没有遇到过这样的怪物了。和往常一样,我稍后会对基础特性做一些性能测试,以下是关于基础特性的演示。
准备工作
依赖
如果你想自己做,那就留意一下,我在查找数据的时候,发现 API和其他版本的 API有很大的区别,这就像是一个陷阱。当您在使用redis.cl ie nts的另一个版本时,您可以通过浏览源代码来了解有关的参数类型。
Maven依赖:
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.2.3</version>
</dependency>
Gradle依赖:
// https://mvnrepository.com/artifact/redis.clients/jedis
implementation group: 'redis.clients', name: 'jedis', version: '4.2.3'
Redis server版本:Redis 6.2.5
。
XADD – 添加消息到末尾
如果key对应的队列不存在,则会自动创建。
首先我们需要创建一个redis.clients.jedis.params.XAddParams
,顾名思义就是查询参数,这里面有重要的参数:redis.clients.jedis.params.XAddParams#maxLen
表示设置队列的长度,但是不常用。语法如下:
def len = XAddParams.xAddParams()
xadd API使用方式如下:
public static void main(String[] args) {
def base = new RedisBase("127.0.0.1", 6379)
Jedis jedis = base.getJedis()
def len = XAddParams.xAddParams()
def map = new HashMap<Integer, String>()
map.put("eswink", Time.getDate() + TAB + 325)
jedis.xadd("fun", len, map)
jedis.close()
}
XTRIM – 对流进行修剪,限制长度
这个API就是设置队列长度。使用方式也非常简单。
public static void main(String[] args) {
def base = new RedisBase("127.0.0.1", 6379)
Jedis jedis = base.getJedis()
def xtrim = jedis.xtrim("fun", XTrimParams.xTrimParams().maxLen(10))
output(xtrim)
jedis.close()
}
返回值是丢弃的消息的数量。
![实践Redis Stream与Java API互通插图1 图片](https://static.esw.eswlnk.com/2022/06/tecmyf9cmyf9.gif)
XDEL – 删除消息
这个就是删除某个消息,使用更简单了。
public static void main(String[] args) {
def base = new RedisBase("127.0.0.1", 6379)
Jedis jedis = base.getJedis()
jedis.xdel("fun",new StreamEntryID(1653129389004,1))
jedis.close()
}
XLEN – 获取流包含的元素数量,即消息长度
话不多说了,使用如下:
jedis.xlen("fun")
XREAD – 以阻塞或非阻塞方式获取消息列表
这个要着重介绍一下,因为我用的就是这个,首先我们需要创建一个redis.clients.jedis.params.XReadParams
,这里有两个参数:redis.clients.jedis.params.XReadParams#count
和redis.clients.jedis.params.XReadParams#block
。前者控制返回数量,后者控制阻塞时间,如果时间小于0则认为不阻塞,等于0则一直会阻塞,小于0会报错。不设置该参数责任无非阻塞模式。PS:数量不足不会造成阻塞。示例如下:
def block = XReadParams.xReadParams().count(3).block(1000)
还有我们需要redis.clients.jedis.Jedis#xread(redis.clients.jedis.params.XReadParams, java.util.Map<java.lang.String,redis.clients.jedis.StreamEntryID>)
第二个参数,这里常用的两种:
Map<String, StreamEntryID> entry = ["fun": new StreamEntryID()]//获取历史消息
Map<String, StreamEntryID> entry = ["fun": StreamEntryID.LAST_ENTRY]//获取在请求之后添加的消息
遍历消息:
List<Map.Entry<String, List<StreamEntry>>> xread = jedis.xread(block, entry)
output(xread.size())
Map.Entry<String, List<StreamEntry>> get = xread.get(0)
def value = get.getValue()
value.each {
println(it.getID())
println(it.getFields().get("eswink"))
}
控制台响应如下:
16:40:56.065 main redis连接池IP:127.0.0.1,端口:6379,超时设置:5000
16:40:56.280 main 1
1653725282325-0
2022-05-28 16:08:02 325
1653725282325-1
2022-05-28 16:08:02 325
1653725282325-2
2022-05-28 16:08:02 325
XRANGE – 获取消息列表,会自动过滤已经删除的消息
该 API在特定的区域中获取一个开始和结束的信息,它可以传递 String类型的信息 ID,或者发送redis.clients.je dis. StreamEntryID。
jedis.xrange("fun", "1653129389045-0", "1653129389047-0")
![实践Redis Stream与Java API互通插图2 图片](https://static.esw.eswlnk.com/2022/06/tedz02zdz02z.gif)
📮评论