博客
关于我
openresty lua集成kafka
阅读量:664 次
发布时间:2019-03-15

本文共 4113 字,大约阅读时间需要 13 分钟。

前提

1、 安装openresty,记得安装nginx的监控模块

2、 安装kafka
3、 下载lua+kafka插件:
4、 解压插件,将lua-resty-kafka-master\lib\resty\kafka文件夹放到openresty/lualib/resty下

首先修改openresty的配置文件中localtion位置,引入外部lua文件,这样修改lua文件会比较方便

location / {           default_type text/html;        content_by_lua_file /usr/local/openresty/tmp.lua;    }

案例

1 先获取kafka的实例

2 通过实例获取连接
3设置分区发送策略
4调用send方法发送数据
5启动一个kafka消费测试,验证是否发送成功

lua代码

------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by NH55.--- DateTime: 2020/12/11 11:48------ 数据采集运行线程阈值监控,如果超过了我们设置的最大阈值,那么就等待不send数据,下个批次再次执行local DEFAULT_THRESH = 100-- 编写kafka相关配置-- 配置broker地址local BROKER_LIST = {       {    host = "192.168.xx.101", port = 9092 },    {    host = "192.168.xx.102", port = 9092 },    {    host = "192.168.xx.103", port = 9092 }}-- kafka分区数local PARTITION_NUM = 3-- kafka的topiclocal TOPIC = "csdn"-- producerConfiglocal CONNECT_PARAMS = {       producer_type = "async", socket_timeout = 30000,    flush_time = 10000, request_timeout = 20000}-- 默认分区local function default_partitioner(key, num, correlation_id)    local id = key and crc32(key) or correlation_id    -- partition_id is continuous and start from 0    return id % numend--- 我们为了让数据均匀分布到每一个分区内,这里我们使用轮询方式发送消息至Kafka分区中--- 相当于自定义分区的模式,当然你也可以不用这种方式,使用默认的分区也行-- 获取共享内存数据local shared_data = ngx.shared.shared_data-- 设置共享内存的变量(Key)local sharedKey = "shared_Key"local key_val = shared_data:get(sharedKey)if not key_val then    key_val = 1    shared_data:set(sharedKey,key_val)end-- 计算消息发送分区local partition_id = ""..tonumber(key_val%PARTITION_NUM)--每个key的value要自增shared_data:incr(sharedKey,1)-- 变量监控local isGone = true-- 进行阈值判断if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESH) then    isGone =falseend-- 满足条件true执行,反之不执行if isGone then    -- 获取页面数据信息    local time_local = ngx.var.time_local    if time_local == nil then        time_local = ""    end    local request = ngx.var.request    if request == nil then        request = ""    end    local request_method = ngx.var.request_method    if request_method == nil then        request_method = ""    end    local content_type = ngx.var.content_type    if content_type == nil then        content_type = ""    end    ngx.req.read_body()    local request_body = ngx.var.request_body    if request_body == nil then        request_body = ""    end    local http_referer = ngx.var.http_referer    if http_referer == nil then        http_referer = ""    end    local remote_addr = ngx.var.remote_addr    if remote_addr == nil then        remote_addr = ""    end    local http_user_agent = ngx.var.http_user_agent    if http_user_agent == nil then        http_user_agent = ""    end    local time_iso8601 = ngx.var.time_iso8601    if time_iso8601 == nil then        time_iso8601 = ""    end    local server_addr = ngx.var.server_addr    if server_addr == nil then        server_addr = ""    end    local http_cookie = ngx.var.http_cookie    if http_cookie == nil then        http_cookie = ""    end    --封装数据    local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#"..            content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#"..            remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#"..            server_addr .."#CS#".. http_cookie;    -- 引入生产者模块创建实例    local producerDic = require "resty.kafka.producer"    -- 创建实例    local producer = producerDic:new(BROKER_LIST,CONNECT_PARAMS)    -- 调用发送方法send    local ok,err = producer:send(TOPIC,partition_id,message)    -- 判断发送消息是否成功打印日志    if not ok then        ngx.log("kafka send message err:",err)    endend

之后打开消费者

kafka-console-consumer.sh \--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \--topic csdn

刷新nginx监听的网页,在消费者端就可以收到内容

tt

11/Dec/2020:19:53:58 +0800#CS#GET / HTTP/1.1#CS#GET#CS##CS##CS##CS#192.168.xx.1#CS#Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Edg/87.0.664.57#CS#2020-12-11T19:53:58+08:00#CS#192.168.xx.101#CS#11/Dec/2020:19:54:01 +0800#CS#GET /

注意事项

kafka server.properties 需开启如下选项

集群的每台机器都需要打开

advertised.listeners=PLAINTEXT://192.168.xx.103:9092

转载地址:http://kasmz.baihongyu.com/

你可能感兴趣的文章
MYSQL sql语句针对数据记录时间范围查询的效率对比
查看>>
mysql sum 没返回,如果没有找到任何值,我如何在MySQL中获得SUM函数以返回'0'?
查看>>
mysql sysbench测试安装及命令
查看>>
mysql Timestamp时间隔了8小时
查看>>
Mysql tinyint(1)与tinyint(4)的区别
查看>>
MySQL Troubleshoting:Waiting on query cache mutex
查看>>
mysql union orderby 无效
查看>>
mysql v$session_Oracle 进程查看v$session
查看>>
mysql where中如何判断不为空
查看>>
MySQL Workbench 使用手册:从入门到精通
查看>>
MySQL Workbench 数据库建模详解:从设计到实践
查看>>
MySQL Workbench 数据建模全解析:从基础到实践
查看>>
mysql workbench6.3.5_MySQL Workbench
查看>>
MySQL Workbench安装教程以及菜单汉化
查看>>
MySQL Xtrabackup 安装、备份、恢复
查看>>
mysql [Err] 1436 - Thread stack overrun: 129464 bytes used of a 286720 byte stack, and 160000 bytes
查看>>
MySQL _ MySQL常用操作
查看>>
MySQL – 导出数据成csv
查看>>
MySQL —— 在CentOS9下安装MySQL
查看>>
MySQL —— 视图
查看>>