基于postgres和es完成关键字检索功能主要思路和步骤

需求:

导入所有政策文件,能过通过检索关键字找到对应的文件。

工具:

postgres(基本数据库,存放表数据)、eleasticsearch(全文搜索引擎)、logstash(数据收集引擎)

简要步骤:

1.导入所有政策文件,存放文件信息到postgres数据库中,文件信息包括文件名,上传时间,文件名,文件路径,文件内容等信息。

2.安装es和logstash,logstash主要是将postgres中数据同步到es库中,es主要是将关键信息(文件内容)进行分词,便于检索。

3.写一个接口,从es库中根据关键字进行检索。

重点部分:

1.postgres如何同步到ES库中?

logstash文件夹中conf.d中有一个配置文件:logstash-pg-es.conf,该文件中定义了数据来源与数据输出。值得关注的地方有3处

input {
    jdbc {  1.数据来源是jdbc连接的数据库
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "jdbc:postgresql://192.168.56.33:5432/test"
        # The user we wish to execute our statement as
        jdbc_user => "test"
        jdbc_password =>"test"
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "/some/config-dir/postgresql-42.2.5.jar"
    
    #处理中文乱码问题
    codec => plain { charset => "UTF-8"}
    
     #使用其它字段追踪,而不是用时间
        #use_column_value => true
     #追踪的字段
    # tracking_column => last_modified_date
    # record_last_run => true
     #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
         #last_run_metadata_path => "/some/config-dir/station_parameter.txt"
     #
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
        # The name of the driver class for Postgresql
        jdbc_driver_class => "org.postgresql.Driver"
        # our query
        statement => "select id,name as title,dealdata::json->>'data' as content,activated,enabled,created_by,created_date,last_modified_by,last_modified_date from test where last_modified_date> :sql_last_value" 2.如何触发同步以及如何同步:通过这个statement能出来,test表中last_modified_date是引发同步的关键词,每当有一条比当前时间更新的数据出现时就会引发同步,同时也能看出来,同步是通过一条sql语句查询需要的字段。-----------总结就是:每当有一条数据的last_modified_date比当前时间大时,就从test表中取这条数据的对应字段。
        schedule => "* * * * *"
    }
}
output {3.将取到的数据进行输出。这里能看到输出到配置的es库中,在es库中有一个索引叫做test,数据会存放到该索引中,至于如何存放,则看template值,通过这个json文件进行对应。
        stdout{
            codec=>rubydebug
        }
        elasticsearch{
            hosts =>["192.168.56.35:9200"]
            index =>"test"
            document_type => "test"
            document_id => "%{id}"
            template =>"/data/docker/my-logstash/template/test-template.json"
            template_name => "test"
            template_overwrite => true
        }
}

2.es如何通过json文件进行数据对应?

该json文件就是刚才template的值,是template文件夹下的test-template.json。

{
  "template": "test", 
  "settings" : {
    "index.number_of_shards" : 5,
    "number_of_replicas" : 1,
    "index.refresh_interval" : "60s"
  },
  "mappings": {
    "_default_": {
      "_all":{"enabled":true},
      "dynamic_templates" : [ {
         "string_fields" : {
           "match" : "*",
           "match_mapping_type" : "string",
           "mapping" : {
             "type" : "string", "index" : "not_analyzed", "omit_norms" : true, "doc_values": true,
               "fields" : {
                 "raw" : {"type": "string", "index" : "not_analyzed", "ignore_above" : 256,"doc_values": true}
               }
           }
         }
       } ],
      "properties": {里面各个数据项就是刚才statement里sql语句取出来的字段,一一对应
                   "id":{       
                         "type":"keyword"
                   },
                   "content":{ 这个字段就是需要分词的字段,es目前使用的是IK分词器
                         "type":"text",
                                  "analyzer": "ik_max_word",analyzer代表插入内容分词
                                  "search_analyzer": "ik_smart"  search_analyzer代表搜索内容分词
                   },

                         
                        "title":{

                  "type":"keyword"
            },
                   "activated":{
                      
                         "type":"keyword"
                   },
                   "enabled":{
                        
                         "type":"keyword"
                   },
                   "created_by":{
                      
                         "type":"keyword"
                   },
                   "created_date":{
                           "type":"date"
                   },
                   "last_modified_by":{
                        
                         "type":"keyword"
                   },
                   "last_modified_date":{
                           "type":"date"
                   } 
      }
    }
  }
}
 

3.如何从ES中进行关键字查询?

以一个java接口进行代码展示:

用到的包:

    

public Map<String,Object> searchKeyword(String keyword, Pageable pageable){

    // 1. 校验
    if (keyword == null || "".equals(keyword)){
        throw new BusinessException("请输入查询的关键字!");
    }

    // 2. 查询条件
    //  关键词查询  content是ES库里的字段,也就是分词的那个字段
    MatchQueryBuilder keywordMust = QueryBuilders.matchQuery("content",keyword);//关键字查询
    //MatchPhraseQueryBuilder keywordMust = QueryBuilders.matchPhraseQuery("content",inputDTO.getKeyword());


    // 综合查询条件--过滤  由于没写删除,这里用的是软删除,筛选结果是enabled为true
    TermsQueryBuilder enabledMustNot = QueryBuilders.termsQuery("enabled", "true");
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    boolQueryBuilder.must(enabledMustNot).must(keywordMust);


    // 3. 高亮
    HighlightBuilder highlightBuilder  = new HighlightBuilder().field("*").requireFieldMatch(false);
    highlightBuilder.preTags("<span style=\"color:"+highlight+"\">");
    highlightBuilder.postTags("</span>");


    // 3. 分页
    Integer pageSize = (pageable==null || pageable.getPageSize()==0) ? size: pageable.getPageSize();
    Integer from = (pageable==null )? 0: pageable.getPageNumber()* pageSize;

    //String sort = (pageable==null || pageable.getSort()==null)?"last_modified_date":pageable.getSort().toString();
    SortBuilder sortBuilder = SortBuilders.fieldSort("last_modified_date").order(SortOrder.DESC);


    // 4. 查询
    SearchResponse response     索引 索引类型
        = client.prepareSearch("test").setTypes("test")
        .setQuery(boolQueryBuilder)
        .highlighter(highlightBuilder)
        .setFrom(from).setSize(pageSize)
        .addSort(SortBuilders.scoreSort().order(SortOrder.DESC))
        .addSort(sortBuilder)
        .execute().actionGet();

    List<KeywordSearchOutputDTO> list = new ArrayList<>();

    for(SearchHit hit: response.getHits())
    {
        // 处理高亮
        Map<String, HighlightField> highlightFields = hit.getHighlightFields();

        HighlightField highlightField = highlightFields.get("content");
        Text[] arrayText = highlightField.getFragments();
        StringBuffer sb = new StringBuffer();
        for (Text str:arrayText){
            sb.append(str.toString());
        }

        //
        KeywordSearchOutputDTO outputDTO = new KeywordSearchOutputDTO();

        Map<String,Object>  source = hit.getSource();

        outputDTO.setId(source.get("id").toString());
        outputDTO.setTitle(source.get("title").toString());
        outputDTO.setContent(sb.toString());
        outputDTO.setCreatedBy(source.get("created_by").toString());
        outputDTO.setCreatedDate(TimeUtils.String2LocalDateTime(source.get("created_date").toString()));
        outputDTO.setLastModifiedBy(source.get("last_modified_by").toString());
        outputDTO.setLastModifiedDate(TimeUtils.String2LocalDateTime(source.get("created_date").toString()));

        list.add(outputDTO);
    }

    // 总记录数
    Long total = response.getHits().getTotalHits();

    Map<String,Object> result = new HashMap<>();
    result.put("total",total);
    result.put("size",pageSize);
    result.put("data",list);
    return result;
}