如何使用logstash更新已有的elasticsearch记录

如何使用logstash更新已有的elasticsearch记录

常使用elasticsearch的童鞋,必定会遇到这种状况:咱们须要修改已存储在ES中的数据,不管是数据内容或者是数据结构,来知足咱们不断变化的需求。当咱们须要修改数据的时,若是本身撸码一条一条的改动数据,难免有点低级,特别在大量的数据都须要修改的时候,这根本就是没法完成的任务。此时,势必要求助于工具。不知道Logstash是否是在其余领域也比较普及,但在ELK架构的日志分析系统里面,logstash是咱们的ETL模块,对数据的提取,转换,丰富都是由它来完成的。它能够从其余数据源提取原始数据,通过转换,再把数据按照咱们的要求输入到ES当中。也能够从ES里面提取已有的数据,进行再处理以后,再从新输出到ES当中。这样,咱们就能够轻松的完成对已有数据的更新,美滋滋!固然,若是全部的原始数据还在,你也能够不用这么高级的玩法,直接把index删掉,把原始数据按照新的需求结构化后,从新导入ES也是能够的!但若是你还有点追求,接着往下看,顺便给我点个赞,咱们来介绍怎么玩。git

玩法简介

  1. 观察你的ES数据,肯定须要修改的项,好比:
    • @timestamp和你日志中的timestamp没有对应上
    • 数据中的url须要拆分红更细verb, request, host, device...等项
  2. 提取一些样本数据到本地的ES上
  3. 在logstash里面配置elasticsearch input plugin,读取本地ES的样本数据,并使用filter plugin重构你的数据
  4. 反复修改,直到logstash可以正确处理全部样本
  5. 将logstash配置为读取服务器的须要修改的数据(须要正确的query),并重构你的数据。

以上,就是一个比较直观简短的介绍,很简单。。。json

软件要求

其实要求只有一个,那就是把你logstash的elasticSearch input plugin升级到5.3.*版本以上,我在5.2.2版本上就遇到了下面的bug:服务器

Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Failed to parse request body"}],"type":"illegal_argument_exception","reason":"Failed to parse request body","caused_by":{"type":"json_parse_exception","reason":"Unrecognized token 'DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAAgJFmlmNnYtNlZkU1B5TmlhRjU4QkJLZkEAAAAAAAAIChZpZjZ2LTZWZFNQeU5pYUY1OEJCS2ZBAAAAAAAACAsWaWY2di02VmRTUHlOaWFGNThCQktmQQAAAAAAAAgMFmlmNnYtNlZkU1B5TmlhRjU4QkJLZkEAAAAAAAAIDRZpZjZ2LTZWZFNQeU5pYUY1OEJCS2ZB': was expecting ('true', 'false' or 'null')\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@7b31ce02; line: 1, column: 457]"}},"status":400} Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Failed to parse request body"}],"type":"illegal_argument_exception","reason":"Failed to parse request body","caused_by":{"type":"json_parse_exception","reason":"Unrecognized token 'DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAAgJFmlmNnYtNlZkU1B5TmlhRjU4QkJLZkEAAAAAAAAIChZpZjZ2LTZWZFNQeU5pYUY1OEJCS2ZBAAAAAAAACAsWaWY2di02VmRTUHlOaWFGNThCQktmQQAAAAAAAAgMFmlmNnYtNlZkU1B5TmlhRjU4QkJLZkEAAAAAAAAIDRZpZjZ2LTZWZFNQeU5pYUY1OEJCS2ZB': was expecting ('true', 'false' or 'null')\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@7b31ce02; line: 1, column: 457]"}},"status":400}

这个问题出如今使用logstash从ES读取数据时。当出现这个问题时,logstash会一直从ES上不停的query数据,药不能停。 固然,个人整个集群都是使用的最新版本的(5.5.*)的ES,Kibana, Logstash,我不保证用低版本也能完成一样的事情。数据结构

配置logstash

input{ # 在写入样本时,能够先使用stdin plugin往ES里面写入样本数据 # stdin{ # type => "test" # } # 使用如下的配置,从ES中读入数据 elasticsearch { hosts => "localhost:9200" index => "test-*" query => '{ "query": { "query_string": { "tags": "_grokparsefailure" } } }' size => 500 scroll => "5m" docinfo => true codec => json } } # 如下filter只是一个例子,不用在乎 filter { mutate{ remove_tag => ["_grokparsefailure"] } grok { patterns_dir => ["./patterns"] match => { "message" => "%{PLATFORM_SYSLOG}" } } date { match => ["timestamp", "MMM dd HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"] } if "_grokparsefailure" not in [tags] { mutate{ remove_field => ["message"] } } } output{ # stdout{ # codec => json # } # 在个人实践中,我会先把ES里面的原记录删掉,再建立一条新的记录,固然,也能够直接update elasticsearch { hosts => ["localhost:9200"] # action => "update" action => "delete" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" index => "%{[@metadata][_index]}" user => "elastic" password => "changeme" } elasticsearch{ hosts => ["localhost:9200"] #index => "test-%{+YYYY.MM.dd}" index => "%{[@metadata][_index]}" user => "elastic" password => "changeme" } } input{ # 在写入样本时,能够先使用stdin plugin往ES里面写入样本数据 # stdin{ # type => "test" # } # 使用如下的配置,从ES中读入数据 elasticsearch { hosts => "localhost:9200" index => "test-*" query => '{ "query": { "query_string": { "tags": "_grokparsefailure" } } }' size => 500 scroll => "5m" docinfo => true codec => json } } # 如下filter只是一个例子,不用在乎 filter { mutate{ remove_tag => ["_grokparsefailure"] } grok { patterns_dir => ["./patterns"] match => { "message" => "%{PLATFORM_SYSLOG}" } } date { match => ["timestamp", "MMM dd HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"] } if "_grokparsefailure" not in [tags] { mutate{ remove_field => ["message"] } } } output{ # stdout{ # codec => json # } # 在个人实践中,我会先把ES里面的原记录删掉,再建立一条新的记录,固然,也能够直接update elasticsearch { hosts => ["localhost:9200"] # action => "update" action => "delete" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" index => "%{[@metadata][_index]}" user => "elastic" password => "changeme" } elasticsearch{ hosts => ["localhost:9200"] #index => "test-%{+YYYY.MM.dd}" index => "%{[@metadata][_index]}" user => "elastic" password => "changeme" } }

图文步骤

问题document

屏幕快照_2017-09-15_下午4.25.25
能够看到,这条记录没有被正确的解析,和这条记录相似的其余记录都有这样的问题,咱们须要对它从新解析

输入样本数据

把这条没法解析的数据,放到本地的ES上作测试。将logstash的input配置成stdin, 而后导入。架构

配置

input{ stdin{ type => "test" } } output{ elasticsearch{ hosts => ["localhost:9200"] index => "test-%{+YYYY.MM.dd}" user => "elastic" password => "changeme" } } input{ stdin{ type => "test" } } output{ elasticsearch{ hosts => ["localhost:9200"] index => "test-%{+YYYY.MM.dd}" user => "elastic" password => "changeme" } }

导入

启动logstash,输入没法解析的数据elasticsearch

[2017-09-15T17:13:20,609][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} [2017-09-15T17:13:20,620][INFO ][logstash.pipeline ] Pipeline main started The stdin plugin is now waiting for input: [2017-09-15T17:13:20,662][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} ,Sep 15 03:29:02,HostName=sz190034,IP=10.60.22.117,Tag=run-parts(/etc/cron.daily)[19239,ProgramName=run-parts(,Procid=,Facility=cron,Sev=notice,AppName=run-parts(,Msg= finished prelink [2017-09-15T17:13:20,609][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} [2017-09-15T17:13:20,620][INFO ][logstash.pipeline ] Pipeline main started The stdin plugin is now waiting for input: [2017-09-15T17:13:20,662][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} ,Sep 15 03:29:02,HostName=sz190034,IP=10.60.22.117,Tag=run-parts(/etc/cron.daily)[19239,ProgramName=run-parts(,Procid=,Facility=cron,Sev=notice,AppName=run-parts(,Msg= finished prelink

能够看到,ES里面已经有了这条数据:工具

屏幕快照_2017-09-15_下午5.15.20

从ES中读入数据,并现场修改

继续在本地环境上,验证咱们的修改,使用以前提到的logstash配置,从ES上读入数据,用filter对数据内容进行更新,而后再放回ES。这里须要注意的是,咱们既能够在原有的记录上直接修改,也能够在将原记录删除,把新的记录填上。对ES已有的记录进行更新或删除,请记得下面三项是必须配置:gitlab

 index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}"

"%{[@metadata][_index]}"是原记录所在的索引,"%{[@metadata][_type]}"是原记录的类型,"%{[@metadata][_id]}"是原记录的id。这三条缺一不可,特别是索引(其余两项是必填,不填logstash的插件会报错),由于它不会报错,可是update会不成功。 使用以下的配置重启logstash:测试

 elasticsearch { hosts => "localhost:9200" index => "test-*" query => '{ "query": { "query_string": { "query": "*" } } }' size => 500 scroll => "5m" docinfo => true codec => json } } filter { mutate{ # remove_field => ["@timestamp"] remove_tag => ["_grokparsefailure"] } grok { patterns_dir => ["./patterns"] match => { "message" => "%{PLATFORM_SYSLOG}" } } date { match => ["timestamp", "MMM dd HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"] } if "_grokparsefailure" not in [tags] { mutate{ remove_field => ["message"] } } } output{ elasticsearch { hosts => ["localhost:9200"] action => "delete" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" index => "%{[@metadata][_index]}" user => "elastic" password => "changeme" } elasticsearch{ hosts => ["localhost:9200"] index => "test-%{+YYYY.MM.dd}" user => "elastic" password => "changeme" } } elasticsearch { hosts => "localhost:9200" index => "test-*" query => '{ "query": { "query_string": { "query": "*" } } }' size => 500 scroll => "5m" docinfo => true codec => json } } filter { mutate{ # remove_field => ["@timestamp"] remove_tag => ["_grokparsefailure"] } grok { patterns_dir => ["./patterns"] match => { "message" => "%{PLATFORM_SYSLOG}" } } date { match => ["timestamp", "MMM dd HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"] } if "_grokparsefailure" not in [tags] { mutate{ remove_field => ["message"] } } } output{ elasticsearch { hosts => ["localhost:9200"] action => "delete" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" index => "%{[@metadata][_index]}" user => "elastic" password => "changeme" } elasticsearch{ hosts => ["localhost:9200"] index => "test-%{+YYYY.MM.dd}" user => "elastic" password => "changeme" } }

它就会把本地ES上的数据取下来,丢给filter,处理以后,再丢回给ES。结果以下:url

屏幕快照_2017-09-15_下午5.32.56
数据被正确解析,而且时间戳被更新

在ES的服务器上更新

在本地验证无误以后,就能够将logstash的elasticsearch input plugin和output plugin的地址改成服务器的地址,并更新一下query的规则,选出须要更新的记录,好比:

 elasticsearch { hosts => "10.60.47.168:9200" index => "platform-*" query => '{ "query": { "term": { "tags": "_grokparsefailure" } } }' size => 500 scroll => "5m" docinfo => true codec => json } elasticsearch { hosts => "10.60.47.168:9200" index => "platform-*" query => '{ "query": { "term": { "tags": "_grokparsefailure" } } }' size => 500 scroll => "5m" docinfo => true codec => json }

再进行数据的更新。

小结

咱们能够看到,经过logstash的两个plugin:

  • elasticsearch input plugin
  • elasticsearch output plugin

咱们能够方便的选出咱们但愿更新的es的数据,在进行修改以后,再放回es当中。整个过程很简单,但在实际的操做过程当中,已经要作足够的测试,争取只对数据作一遍更新。 像上面的例子,若是第一遍事后,发现还要动规则,你就要注意,有些新的field已经被logstash添加到es当中,有一些老的field已经被删除。这时你就不能再用第一遍时的filter了,你的整个filter须要重写,而且在原始数据已经被改过的状况下,还会出现不可逆的数据丢失。因此,要慎行!