本文共 2157 字,大约阅读时间需要 7 分钟。
logstash版本: 5.5.3 及以后
logstash消费阿里云kafka信息并返回到elasticsearch系统配置信息解析:
bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"] #kafka系统的连接地址 client_id => 'tt' #客户端上传到es时,新增字段 group_id => "CID-LOG" #kafka分组的信息 auto_offset_reset => "latest" #从最新的偏移量开始消费 consumer_threads => 5 #decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中 topics => ["alikafka-cid-log"] #//数组类型,可配置多个topic type => "bhy" #//所有插件通用属性,尤其在input里面配置多个数据源时很有用 security_protocol => "SASL_SSL" #kafka连接阿里云的协议 sasl_mechanism => "ONS" #kafka阿里云的消费机制, logstash中默认的是 GSSAPI jaas_path => "/data/logstash/config/kafka_client_jaas.conf" # ONS登录信息的路径 ssl_keystore_location => '/data/logstash/config/kafka.client.truststore.jks' #证书 ssl_truststore_location => '/data/logstash/config/kafka.client.truststore.jks'#信任证书 ssl_keystore_password => "xxxx" #证书密码 ssl_truststore_password => "xxxx" #证书密码
关键信息:
logstash使用ONS机制连接kafka时,需要需要用到一些额外的jar包,可以把开发所使用的jar包,都放到 /data/logstash/vendor/jruby/lib/ 下面。我的配置模板:
input{ kafka { bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"] client_id => 'tt' group_id => "CID-LOG" auto_offset_reset => "latest" #从最新的偏移量开始消费 consumer_threads => 5 #decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中 topics => ["alikafka-cid-log"] #//数组类型,可配置多个topic type => "bhy" #//所有插件通用属性,尤其在input里面配置多个数据源时很有用 security_protocol => "SASL_SSL" sasl_mechanism => "ONS" jaas_path => "/data/logstash/config/kafka_client_jaas.conf" ssl_keystore_location => '/data/logstash/config/kafka.client.truststore.jks' ssl_truststore_location => '/data/logstash/config/kafka.client.truststore.jks' ssl_keystore_password => "xx" ssl_truststore_password => "xx" }}output { elasticsearch { hosts => ["es-ip:9200"] user => ["xxxx"] password => ["xxxx"] index => ["services"] } stdout { codec=>plain }}
java参数优化路径:
config/jvm.options
转载于:https://blog.51cto.com/zhenfen/2105234