使用logstash从MySQL增量提取数据,传入elasticsearch中,并经过kibana作简单的图表html
logstash安装java
#下载,logstash5及以上版本须要jdk8 wget https://artifacts.elastic.co/downloads/logstash/logstash-6.1.1.zip #解压 unzip logstash-6.1.1.zip #To test your Logstash installation, run the most basic Logstash pipeline #测试logstash环境,运行以下demo(input {stdin{}}:接收终端输入;output {stdout{}}:输出到终端),出现Pipeline main started为正常 cd logstash-6.1.1 ./bin/logstash -e 'input {stdin{}} output {stdout{}}' #-----------------------------------start----------------------------------- Settings: Default pipeline workers: 24 Pipeline main started #------------------------------------end------------------------------------ #The -e flag enables you to specify a configuration directly from the command line. Specifying configurations at the command line lets you quickly test configurations without having to edit a file between iterations. The pipeline in the example takes input from the standard input, stdin, and moves that input to the standard output, stdout, in a structured format. #测试,输入hello world,而后回车 #出现以下信息即为安装成功 #-----------------------------------start----------------------------------- 2018-01-04T02:44:41.024Z hostname hello world #------------------------------------end------------------------------------
logstash-input-jdbc插件node
#logstash-6.1.1已支持logstash-input-jdbc,不须要单独安装 #老版本安装logstash-input-jdbc ./bin/plugin install logstash-input-jdbc
配置logstash增量提取MySQL数据mysql
cat mysql_pipelines.yml #-----------------------------------start----------------------------------- #输入部分 input { jdbc { #链接MySQL驱动,须要本身下载 jdbc_driver_library => "/es/mysql-connector-java-5.1.31.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://10.112.29.30:3306/mstore" #链接数据库帐号信息 jdbc_user => "MySQL_admin" jdbc_password => "password" #分页 jdbc_paging_enabled => true #分页大小 jdbc_page_size => 100000 #流式获取数据,每次取10000. jdbc_fetch_size => 10000 #Maximum number of times to try connecting to database connection_retry_attempts => 3 #Number of seconds to sleep between connection attempts connection_retry_attempts_wait_time => 1 #Connection pool configuration. The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5) jdbc_pool_timeout => 5 #Whether to force the lowercasing of identifier fields lowercase_column_names => true #Whether to save state or not in last_run_metadata_path #保存上次运行记录,增量提取数据时使用 record_last_run = > true #"* * * * *"为每分钟执行一次 schedule => "* * * * *" #Use an incremental column value rather than a timestamp use_column_value => true #sql_last_value #The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run. tracking_column => "id" #查询语句 statement => "SELECT id,package_name,name,sub_name,editor_comment,high_quality,sub_category,tag,update_time FROM tbl_app WHERE id > :sql_last_value" } } #过滤部分 filter { json { source => "message" remove_field => ["message"] } date{ match => ["update_time","yyy-MM-dd HH:mm:ss"] } } #输出到elastsicearch output { elasticsearch { #elasticsearch集群地址,不用列出全部节点,默认端口号也可省略 hosts => ["10.127.92.181:9200", "10.127.92.212:9200", "10.127.92.111:9200"] #索引值,查询的时候会用到;须要先在elasticsearch中建立对应的mapping,也能够采用默认的mapping index => "store" #指定插入elasticsearch文档ID,对应input中sql字段id document_id => "%{id}" } } #------------------------------------end------------------------------------ #注:使用时请去掉此文件中的注释,否则会报错 #logstash会把执行记录默认存在帐户根目录下: /root/.logstash_jdbc_last_run #若是须要从新加载数据到elasticsearch,须要删除这个文件
启动logstashlinux
请在启动elasticsearch以后再启动logstash,否则链接elasticsearch会报错git
#后台启动logstash ./bin/logstash -f config/mysql_pipelines.yml & #若是报下面错误,说明jdk版本不支持 #-----------------------------------start----------------------------------- NameError: cannot link Java class org.logstash.RubyUtil org/logstash/RubyUtil : Unsupported major.minor version 52.0 method_missing at org/jruby/javasupport/JavaPackage.java:259 <main> at /disk2/es/logstash-6.1.1/logstash-core/lib/logstash-core/logstash-core.rb:37 require at org/jruby/RubyKernel.java:955 <main> at /disk2/es/logstash-6.1.1/logstash-core/lib/logstash/runner.rb:1 require at org/jruby/RubyKernel.java:955 <main> at /disk2/es/logstash-6.1.1/lib/bootstrap/environment.rb:66 #------------------------------------end------------------------------------ #logstash-6.1.1须要jdk1.8 #请自行下载jdk1.8版本放到/opt/jdk1.8.0_151 #编写启动脚本以下 cat exec_logstash.sh #-----------------------------------start----------------------------------- #!/bin/sh #配置jdk1.8执行环境 export JAVA_HOME=/opt/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar ./bin/logstash -f config/mysql_pipelines.yml & #------------------------------------end------------------------------------
elasticsearch安装github
#下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.1.1.zip #解压 unzip elasticsearch-6.1.1.zip cd elasticsearch-6.1.1 #配置jvm内存 vim config/jvm.options #-----------------------------------start----------------------------------- -Xms8g -Xmx8g #------------------------------------end------------------------------------ #注:把内存(少于)一半给Lucene,内存对于 Elasticsearch 来讲绝对是重要的,它能够被许多内存数据结构使用来提供更快的操做。可是说到这里, 还有另一个内存消耗大户 非堆内存 (off-heap):Lucene。 #Lucene 被设计为能够利用操做系统底层机制来缓存内存数据结构。 Lucene 的段是分别存储到单个文件中的。由于段是不可变的,这些文件也都不会变化,这是对缓存友好的,同时操做系统也会把这些段文件缓存起来,以便更快的访问。 #Lucene 的性能取决于和操做系统的相互做用。若是你把全部的内存都分配给 Elasticsearch 的堆内存,那将不会有剩余的内存交给 Lucene。 这将严重地影响全文检索的性能。 #标准的建议是把 50% 的可用内存做为 Elasticsearch 的堆内存,保留剩下的 50%。固然它也不会被浪费,Lucene 会很乐意利用起余下的内存。 #若是你不须要对分词字符串作聚合计算(例如,不须要 fielddata )能够考虑下降堆内存。堆内存越小,Elasticsearch(更快的 GC)和 Lucene(更多的内存用于缓存)的性能越好。 #分配给Elasticsearch的内存不能超过32G。JVM 在内存小于 32 GB 的时候会采用一个内存对象指针压缩技术。 #在 Java 中,全部的对象都分配在堆上,并经过一个指针进行引用。 普通对象指针(OOP)指向这些对象,一般为 CPU 字长 的大小:32 位或 64 位,取决于你的处理器。指针引用的就是这个 OOP 值的字节位置。 #对于 32 位的系统,意味着堆内存大小最大为 4 GB。对于 64 位的系统, 可使用更大的内存,可是 64 位的指针意味着更大的浪费,由于你的指针自己大了。更糟糕的是, 更大的指针在主内存和各级缓存(例如 LLC,L1 等)之间移动数据的时候,会占用更多的带宽。 #Java 使用一个叫做 内存指针压缩(compressed oops)的技术来解决这个问题。 它的指针再也不表示对象在内存中的精确位置,而是表示 偏移量 。这意味着 32 位的指针能够引用 40 亿个 对象 , 而不是 40 亿个字节。最终, 也就是说堆内存增加到 32 GB 的物理内存,也能够用 32 位的指针表示。 #一旦你越过那个神奇的 ~32 GB 的边界,指针就会切回普通对象的指针。 每一个对象的指针都变长了,就会使用更多的 CPU 内存带宽,也就是说你实际上失去了更多的内存。事实上,当内存到达 40–50 GB 的时候,有效内存才至关于使用内存对象指针压缩技术时候的 32 GB 内存。 #即使你有足够的内存,也尽可能不要 超过 32 GB。由于它浪费了内存,下降了 CPU 的性能,还要让 GC 应对大内存。 #设置堆内存为 31 GB 是一个安全的选择。 另外,你能够在你的 JVM 设置里添加 -XX:+PrintFlagsFinal 用来验证 JVM 的临界值, 而且检查 UseCompressedOops 的值是否为 true。对于你本身使用的 JVM 和操做系统,这将找到最合适的堆内存临界值。 #具体请参考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html #elasticsearch集群信息配置 vim config/elasticsearch.yml #-----------------------------------start----------------------------------- #配置集群名称,每一个节点集群名称请保持一致 cluster.name: my-app #配置节点名称,每一个节点须要起不一样的名称 node.name: node-1 #配置数据存储目录 path.data: /disk3/to/data,/disk4/to/data #log存储位置 path.logs: /disk3/to/logs #Set the bind address to a specific IP (IPv4 or IPv6) #0.0.0.0为不限制访问 network.host: 0.0.0.0 #端口 http.port: 9200 #配置集群 discovery.zen.ping.unicast.hosts: ["10.127.92.212", "10.127.92.181"] #Lock the memory on startup #true表示不容许内存交换(内存交换影响速度) #若是报错,使用执行ulimit -l unlimited,取消限制最大加锁内存 bootstrap.memory_lock: true #系统调用过滤器,建议禁用该项检查,由于不少检查项须要Linux 3.5以上的内核 bootstrap.system_call_filter: false #------------------------------------end------------------------------------
elasticsearch配置sql
#在其余资源可用的前提下,单个JVM能开启的最大线程数是/proc/sys/vm/max_map_count的设置数的一半 #永久生效,把vm.max_map_count=262144写入/etc/sysctl.conf中,而后执行sysctl -p #默认vm.max_map_count=65530,会报错,致使elasticsearch没法启动 sysctl -w vm.max_map_count=262144 sysctl -p #默认elasticsearch没法以root帐户启动,须要建立单独帐户 #建立用户组 groupadd elastic #建立用户es,指定附属组root useradd -g elastic -G root es #配置jdk1.8环境 vim bin/elasticsearch #-----------------------------------start----------------------------------- #最上面加入下面内容 export JAVA_HOME=/disk4/es/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar #------------------------------------end------------------------------------
中文分词插件安装数据库
#不须要中文分词搜索的请忽略这一步 #下载中文分词插件ik wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.1.1/elasticsearch-analysis-ik-6.1.1.zip #解压 unzip elasticsearch-analysis-ik-6.1.1.zip #移动解压文件到elasticsearch内plugins下便可 mv elasticsearch-analysis-ik-6.1.1 elasticsearch-6.1.1/plugins/ik #ik在github地址:https://github.com/medcl/elasticsearch-analysis-ik #Analyzer: ik_smart , ik_max_word ; Tokenizer: ik_smart , ik_max_word #ik_max_word: 会将文本作最细粒度的拆分,好比会将“中华人民共和国国歌”拆分为“中华人民共和国,中华人民,中华,华人,人民共和国,人民,人,民,共和国,共和,和,国国,国歌”,会穷尽各类可能的组合; #ik_smart: 会作最粗粒度的拆分,好比会将“中华人民共和国国歌”拆分为“中华人民共和国,国歌”。 #在elasticsearch启动以后能够测试 curl -H 'content-type: application/json' 'http://localhost:9200/store/_analyze?pretty=true' -d '{"text":"中华人民共和国国歌","analyzer":"ik_max_word"}' curl -H 'content-type: application/json' 'http://localhost:9200/store/_analyze?pretty=true' -d '{"text":"中华人民共和国国歌","analyzer":"ik_smart"}'
启动elasticsearchjson
#以用户es启动elasticsearch #-d指定后台启动 sudo -u es ./bin/elasticsearch -d #逐个启动集群节点 #检查集群 curl -XGET http://localhost:9200?pretty #-----------------------------------start----------------------------------- { "name" : "node-1", "cluster_name" : "my-app", "cluster_uuid" : "ncrtFPuhRJuv9D7R4cOp4w", "version" : { "number" : "6.1.1", "build_hash" : "bd92e7f", "build_date" : "2017-12-17T20:23:25.338Z", "build_snapshot" : false, "lucene_version" : "7.1.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" } #------------------------------------end------------------------------------ #查看集群 curl -XGET 'localhost:9200/_cat/nodes?v&pretty' #-----------------------------------start----------------------------------- ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name 10.127.92.212 25 92 0 0.00 0.00 0.00 mdi - node-2 10.127.92.181 13 99 1 0.18 0.21 0.10 mdi - node-1 10.127.92.111 12 70 3 0.14 0.04 0.01 mdi * node-3 #------------------------------------end------------------------------------ #*号表示为master节点 #查看elasticsearch安装插件 curl -XGET localhost:9200/_cat/plugins?v #-----------------------------------start----------------------------------- name component version node-2 analysis-ik 6.1.1 node-1 analysis-ik 6.1.1 node-3 analysis-ik 6.1.1 #------------------------------------end------------------------------------
建立mapping
mapping相似于关系型数据库中的表结构定义
#建立mapping文件 vim store_mapping.json #-----------------------------------start----------------------------------- { "settings": { "number_of_shards": 5,#主分片数,默认5 "number_of_replicas": 1#副本数,写1为每一个主分片有一个副本 }, "mappings": { #type类型,新版貌似不能修改了,默认就是doc。也就是说这个位置如今固定为doc了 "doc": { "properties": { #sql中对应字段信息 "id": { #支持的数据类型:https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html "type": "long", #The index option controls whether field values are indexed. It accepts true or false and defaults to true. Fields that are not indexed are not queryable. "index": false, #是否存在于_source,和source filtering使用相关,默认true "store": true }, "package_name": { "index": false, #keyword表示是不会拆解,表示准确值 #They are typically used for filtering (Find me all blog posts where status is published), for sorting, and for aggregations. Keyword fields are only searchable by their exact value. #If you need to index structured content such as email addresses, hostnames, status codes, or tags, it is likely that you should rather use a keyword field. #If you need to index full text content such as email bodies or product descriptions, it is likely that you should rather use a text field. "type": "keyword" }, "name": { #type为text会被拆解成词元 #an analyzer to convert the string into a list of individual terms before being indexed #The analysis process allows Elasticsearch to search for individual words within each full text field. Text fields are not used for sorting and seldom used for aggregations "type": "text", #指定解析者,默认standard "analyzer": "ik_max_word", #The analyzer that should be used at search time on analyzed fields. Defaults to the analyzer setting. "search_analyzer": "ik_max_word" }, "sub_name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "editor_comment": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "high_quality": { "type": "integer", "store": true }, "sub_category": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "tag": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "update_time": { "type": "date" } } } } } #------------------------------------end------------------------------------ #新建index,这里为store(和logstash配置文件中保持一致) curl -XPUT 'localhost:9200/store' #上传mapping到建立的index中 curl -XPUT -H 'content-type: application/json' 'http://localhost:9200/store' -d @store_mapping.json #查看建立的mapping curl -XGET http://localhost:9200/store/doc/_mapping?pretty #这个时候就能够启动logstash了 #查看elasticsearch是否已经有数据 curl -XGET http://localhost:9200/store/doc/_search?pretty=true #v:显示详细信息;pretty:格式化显示信息 #来个复杂点的查询。查询name或者editor_comment包含“自由”,并以update_time降序,_score降序排序搜索结果 curl -XPOST -H 'content-type: application/json' 'http://localhost:9200/store/doc/_search?pretty=true' -d ' { "query": { "multi_match" : { "query": "自由", "fields": [ "name", "editor_comment" ] } }, "sort": [ { "update_time": { "order": "desc" }}, { "_score": { "order": "desc" }} ] }'
kibana安装配置
#下载kibana wget https://artifacts.elastic.co/downloads/kibana/kibana-6.1.1-linux-x86_64.tar.gz #解压 tar -zxvf kibana-6.1.1-linux-x86_64.tar.gz cd kibana-6.1.1-linux-x86_64 #配置 vim config/kibana.yml #-----------------------------------start----------------------------------- #配置kibana访问端口 server.port: 5601 #配置容许访问kibana的ip,0.0.0.0表示不作限制 server.host: "0.0.0.0" #elasticsearch集群链接 elasticsearch.url: "http://localhost:9200" #kibana运行ID位置 pid.file: /var/run/kibana.pid #------------------------------------end------------------------------------ #启动kibana ./bin/kibana &
访问kibana:http://ip:port
界面以下:
点击Management,而后点击Index Patterns,建立一个index过滤器
点击Next step,选择是否经过时间过滤数据
点击Create index pattern即建立
点击左侧Discover,建立一个查询
点击查询
左侧Avaliable Fields能够设置右侧显示字段信息,点击上方Save能够保存查询条件
点击左侧Visualize,点击Create a visualization
选择一个你要建立的图表,我选择了饼图
store_test为Discover中保存的搜索条件名称,点击store_test
点击页面靠左侧部分的Split Slices,而后选择Aggregation类型,配置结果以下
而后点击执行按钮,结果以下,From为>=,to为<
点击上方Save,并定义个Visualization名称
注:若是须要配置双层圆环,能够点击Add sub-buckets,操做同Split Slices
点击左侧Dashboard,点击Create a dashboard
点击Add按钮
点击visualize名称,下方出现图表以下
点击上方Save,并输入Dashbioard名称
注:建立的图表均可以以这种方式放到Dashboard中