Eswlnk Blog Eswlnk Blog
  • 资源
    • 精彩视频
    • 破解专区
      • WHMCS
      • WordPress主题
      • WordPress插件
    • 其他分享
    • 极惠VPS
    • PDF资源
  • 关于我
    • 论文阅读
    • 关于本站
    • 通知
    • 左邻右舍
    • 玩物志趣
    • 日志
    • 专题
  • 热议话题
    • 游戏资讯
  • 红黑
    • 渗透分析
    • 攻防对抗
    • 代码发布
  • 自主研发
    • 知识库
    • 插件
      • ToolBox
      • HotSpot AI 热点创作
    • 区块
    • 快乐屋
    • 卡密
  • 乱步
    • 文章榜单
    • 热门标签
  • 问答中心反馈
  • 注册
  • 登录
首页 › 代码发布 › 「代码发布」基于时间戳的日志回放引擎

「代码发布」基于时间戳的日志回放引擎

Eswlnk的头像
Eswlnk
2022-08-22 12:21:48
「代码发布」基于时间戳的日志回放引擎-Eswlnk Blog
智能摘要 AI
本文讨论了一种基于时间戳的日志回放引擎的设计与实现。最初认为日志回放只是简单地重发日志记录的请求,但通过研究`goreplay`后,作者意识到可以通过记录特定时间段的流量并基于时间戳回放流量来模拟真实场景。该引擎主要针对HTTP流量,分为日志清洗、按时间戳排序和日志回放三个步骤。核心部分包括使用`java.util.concurrent.DelayQueue`作为延迟队列,并通过线程池和连接池优化性能。生产者负责从日志中提取请求并按时间戳放入队列,消费者则从队列中取出请求并以指定倍率发送。测试结果显示,该引擎能够达到较高的QPS,验证了设计方案的有效性。

之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。

查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。

「代码发布」基于时间戳的日志回放引擎-Eswlnk Blog
基于时间戳的日志回放引擎

解决思路

目前流量回放集中于HTTP流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:

  1. 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含HTTP请求的组成部分。
  2. 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作
  3. 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。

其中最最核心的应该就是队列的选择,这里我用看java的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:

  • Java&Go高性能队列之LinkedBlockingQueue性能测试  2022-01-10
  • Java&Go高性能队列之Disruptor性能测试  2022-02-14
  • Java&Go高性能队列之channel性能测试  2022-02-17

本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。

然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试。

「代码发布」基于时间戳的日志回放引擎-Eswlnk Blog

实现

总体来说实现起来思路比较清晰,我分成三部分分享。

属性定义

  1. 我首先定义了一个com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志对象,用于存储每一个请求日志
  2. 然后定义一个com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是OK的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。
  3. 再定义com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs中取,clone之后丢到队列中;消费者从队列中取对象,丢给线程池。
  4. 定义com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,就是把流量对象包装成HttpRequestBase对象然后发送出去

生产者

  1. 确定使用异步线程完成,使用Java自定义异步功能实践。
  2. 根据com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum参数来控制。
  3. 多线程取com.funtester.frame.execute.ReplayConcurrent#logs对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。
  4. 使用了com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH控制队列的长度。貌似没找到限制延迟队列长度的API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免1s中内队列变成空。回放引擎设计50万QPS,所以我就先设置了80万的最大长度。后续可以根据实际情况调整。

消费者

  1. 依旧使用异步,生产者
  2. 使用API时java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免阻塞导致线程无法终止。
  3. 引入com.funtester.frame.execute.ReplayConcurrent#getMultiple控制流量回放的倍数。
  4. 使用com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。
  5. 使用com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志对象。

代码如下:

package com.funtester.frame.execute

import com.funtester.base.bean.AbstractBean
import com.funtester.frame.SourceCode
import com.funtester.utils.LogUtil
import com.funtester.utils.RWUtil
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger

import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder

/**
 * 回放功能执行类*/
class ReplayConcurrent extends SourceCode {

    private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);

    static ThreadPoolExecutor executor

    static boolean key = true

    static int MAX_LENGTH = 800000

    int threadNum = 2

    String name

    String fileName

    int multiple

    Closure handle

    List<ReplayLog> logs

    DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()

    LongAdder total = new LongAdder()

    ReplayConcurrent(String name, String fileName, int multiple, Closure handle) {
        this.name = name
        this.fileName = fileName
        this.multiple = multiple
        this.handle = handle

    }

    void start() {
        if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R")
        time({
            RWUtil.readFile(fileName, {
                def delay = new ReplayLog(it)
                if (delay.getTimestamp() != 0) logDelayQueue.add(delay)
            })
        }, 1, "读取日志$fileName")
        logs = logDelayQueue.toList()
        def timestamp = logs.get(0).getTimestamp()
        logDelayQueue.clear()
        AtomicInteger index = new AtomicInteger()
        AtomicInteger size = new AtomicInteger()
        def LogSize = logs.size()
        AtomicInteger diff = new AtomicInteger()
        threadNum.times {
            fun {
                while (key) {
                    if (index.get() % LogSize == 0) diff.set(getMark() - timestamp)
                    if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size())
                    if (size.get() > MAX_LENGTH) {
                        sleep(1.0)
                        size.set(logDelayQueue.size())
                    }
                    def replay = logs.get(index.getAndIncrement() % LogSize)
                    logDelayQueue.add(replay.clone(replay.timestamp + diff.get()))
                }
            }
        }
        threadNum.times {
            fun {
                while (key) {
                    def poll = logDelayQueue.poll(1, TimeUnit.SECONDS)
                    if (poll != null) {
                        executor.execute {
                            multiple.times {
                                handle(poll.getUrl())
                                total.add(1)
                            }
                        }

                    }
                }
            }
        }
        fun {
            while (key) {
                sleep(COUNT_INTERVAL as double)
                int real = total.sumThenReset() / COUNT_INTERVAL as int
                def active = executor.getActiveCount()
                def count = active == 0 ? 1 : active
                logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int)
            }
        }

    }

    /**
     * 中止
     * @return
     */
    def stop() {
        key = false
        executor.shutdown()
        logger.info("replay压测关闭了!")
    }

    /**
     * 日志对象*/
    static class ReplayLog extends AbstractBean implements Delayed {

        int timestamp

        String url

        ReplayLog(String logLine) {
            def log = LogUtil.getLog(logLine)
            this.url = log.getUrl()
            this.timestamp = log.getTime()
        }

        ReplayLog(int timestamp, String url) {
            this.timestamp = timestamp
            this.url = url
        }

        @Override
        long getDelay(TimeUnit unit) {
            return this.timestamp - getMark()
        }

        @Override
        int compareTo(Delayed o) {
            return this.timestamp - o.timestamp
        }

        protected Object clone(int timestamp) {
            return new ReplayLog(timestamp, this.url)
        }
    }
}

自测

下面是我的测试用例:

package com.okcoin.hickwall.presses

import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrent
import com.okcoin.hickwall.presses.funtester.httpclient.FunHttp

class RplayT extends FunHttp {

    static String HOST = "http://localhost:12345"

    public static void main(String[] args) {
        def fileName = "api.log"
        new ReplayConcurrent("测试回放功能", fileName, 1, {String url ->
            getHttpResponse(getHttpGet(HOST + url))
        }).start()

    }
}

测试结果如下:

22:45:43.510 main 
  
10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:23162
10:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:6095
10:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855 
10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:4099
10:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:8806
10:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:28426
10:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:5601
10:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392
本站默认网盘访问密码:1166
本站默认网盘访问密码:1166
qpsstring回放引擎模块日志时间戳
0
0
Eswlnk的头像
Eswlnk
一个有点倒霉的研究牲站长
赞赏
「代码发布」实现博客或第三方网站嵌入bilibili视频
上一篇
「代码发布」使用Python制作IOTQQ管理插件
下一篇

评论 (0)

请登录以参与评论
现在登录
    发表评论

猜你喜欢

  • 「日志」IG无缘S15总决赛
  • 来自谷歌27岁的生日涂鸦
  • 科研记录:ecCodes处理grib文件问题
  • 本站上线邀请码免费兑换系统
  • 开发日志:解决Windows平台无法使用Metview解析数据的难题
Eswlnk的头像

Eswlnk

一个有点倒霉的研究牲站长
1108
文章
319
评论
679
获赞

随便看看

「代码发布」MapStruct 拷贝类属性
2022-08-19 0:37:22
Killer | 一款专为逃避AV、EDR或安全工具所开发的工具
2023-06-15 20:43:45
「Python」中国移动云盘自动签到领云朵脚本
2022-09-18 23:02:29

文章目录

专题展示

WordPress53

工程实践37

热门标签

360 AI API CDN java linux Nginx PDF PHP python SEO Windows WordPress 云服务器 云服务器知识 代码 免费 安全 安卓 工具 开发日志 微信 微软 手机 插件 攻防 攻防对抗 教程 日志 渗透分析 源码 漏洞 电脑 破解 系统 编程 网站优化 网络 网络安全 脚本 苹果 谷歌 软件 运维 逆向
  • 首页
  • 知识库
  • 地图
Copyright © 2023-2025 Eswlnk Blog. Designed by XiaoWu.
本站CDN由 壹盾安全 提供高防CDN安全防护服务
蜀ICP备20002650号-10
页面生成用时 1.126 秒   |  SQL查询 32 次
本站勉强运行:
友情链接: Eswlnk Blog 网站渗透 倦意博客 特资啦!个人资源分享站 祭夜博客 iBAAO壹宝头条
  • WordPress142
  • 网络安全64
  • 漏洞52
  • 软件52
  • 安全48
现在登录
  • 资源
    • 精彩视频
    • 破解专区
      • WHMCS
      • WordPress主题
      • WordPress插件
    • 其他分享
    • 极惠VPS
    • PDF资源
  • 关于我
    • 论文阅读
    • 关于本站
    • 通知
    • 左邻右舍
    • 玩物志趣
    • 日志
    • 专题
  • 热议话题
    • 游戏资讯
  • 红黑
    • 渗透分析
    • 攻防对抗
    • 代码发布
  • 自主研发
    • 知识库
    • 插件
      • ToolBox
      • HotSpot AI 热点创作
    • 区块
    • 快乐屋
    • 卡密
  • 乱步
    • 文章榜单
    • 热门标签
  • 问答中心反馈