package com.walker.support.es.impl; //import co.elastic.clients.elasticsearch.ElasticsearchClient; import com.walker.infrastructure.utils.JsonUtils; import com.walker.infrastructure.utils.StringUtils; import com.walker.support.es.Constants; import com.walker.support.es.SaveData; import com.walker.support.es.SearchService; import com.walker.support.es.util.FullTextUtils; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.TermVectorsRequest; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.MultiMatchQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.WrapperQueryBuilder; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 描述:全文检索使用的服务对象 * @author 时克英 * @date 2020年7月4日 上午11:19:21 */ public class FullTextSearchService implements SearchService, InitializingBean { protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); // @Autowired // private ElasticsearchTemplate elasticsearchTemplate; private RestHighLevelClient restHighClient = null; // private ElasticsearchClient elasticsearchClient = null; private String tokenType = Constants.TOKEN_TYPE_SMART; // @Autowired // private ElasticSearchConfig elasticSearchConfig; @Override public Object createAttachmentProcessor(String description) { logger.warn("暂不支持代码创建附件处理器(Attachment)"); return null; } @Override public boolean existIndex(String name, String type) throws Exception{ GetIndexRequest getIndexRequest = new GetIndexRequest(name); return restHighClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT); // return elasticsearchClient.execute(); } @Override public boolean createIndex(String name, String type, String json){ // type = Constants.TYPE_DEFAULT; try { boolean existIndex = this.existIndex(name, type); if(existIndex){ logger.error("索引存在,无法重复创建:" + name); return false; } } catch (Exception e) { logger.error("查询索引失败:" + name, e); return false; } CreateIndexRequest request = new CreateIndexRequest(name); request.settings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)); // 设置映射字段 try { // XContentBuilder builder = FullTextUtils.createFullTextIndexJson(type); // request.mapping(builder); // String json = FullTextUtils.createIndexJson(type, tokenType); logger.info(json); request.mapping(json, XContentType.JSON); CreateIndexResponse createIndexResponse = restHighClient.indices().create(request, RequestOptions.DEFAULT); if(createIndexResponse != null && createIndexResponse.isAcknowledged()){ logger.info("创建索引表成功:" + name + ", " + type); return true; } logger.error("创建索引表失败:" + createIndexResponse.toString()); return false; } catch (Exception e) { logger.error("创建索引失败:" + name, e); return false; } } // public boolean createIndexFile(String name, String type){ // CreateIndexRequest request = new CreateIndexRequest(name); // request.settings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)); // // // 设置映射字段 // try { // String json = FullTextUtils.createXmFileJson(type, tokenType); // logger.info(json); // request.mapping(json, XContentType.JSON); // CreateIndexResponse createIndexResponse = restHighClient.indices().create(request, RequestOptions.DEFAULT); // if(createIndexResponse != null && createIndexResponse.isAcknowledged()){ // logger.info("创建索引表成功:" + name + ", " + type); // return true; // } // logger.error("创建索引表失败:" + createIndexResponse.toString()); // return false; // } catch (Exception e) { // logger.error("创建索引失败:" + name, e); // return false; // } // } @Override public void removeIndex(String index) throws Exception{ // boolean existIndex = this.existIndex(index, Constants.TYPE_DEFAULT); // if(existIndex){ // DeleteIndexRequest request = new DeleteIndexRequest(index); // restHighClient.indices().delete(request, RequestOptions.DEFAULT); // logger.warn("删除索引库成功:" + index); // } DeleteIndexRequest request = new DeleteIndexRequest(index); restHighClient.indices().delete(request, RequestOptions.DEFAULT); logger.warn("删除索引库成功:" + index); } /** * type表类型在7.0之后会被废弃,但6.0还必须使用,后续需要微调,去掉type * @date 2020-07-15 */ @Override public void putContent(String index, List datas) throws Exception{ if(datas == null || datas.size() == 0){ return; } BulkRequest request = new BulkRequest(); IndexRequest indexRequest = null; for(SaveData d : datas){ indexRequest = new IndexRequest(index, Constants.TYPE_DEFAULT); indexRequest.setPipeline(Constants.PIPELINE_DEFAULT); // indexRequest.opType("create").id(d.getId()).source(FullTextUtils.translate(d)); // indexRequest.opType(DocWriteRequest.OpType.CREATE); // indexRequest.version(2);//设置版本 // indexRequest.versionType(VersionType.EXTERNAL);//设置版本类型 // 这里设置es中的id,可以避免重复写入数据 indexRequest.id(d.getId()); indexRequest.source(FullTextUtils.translate(d)); request.add(indexRequest); logger.info("saveData = " + d.getTitle() + ", " + d.getShowDataSummary()); } BulkResponse bulkResponse = restHighClient.bulk(request, RequestOptions.DEFAULT); if(bulkResponse.hasFailures()){ throw new Exception("(更新过)批量写入es数据出现错误: " + bulkResponse.buildFailureMessage()); } logger.info("批量写入es数据成功"); } @Override public String search(String index, String queryText, int dataSecurity, String securityInfo , int pageIndex, int pageSize) throws Exception{ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 1-基本参数 //将请求限制为一个类型。 // searchRequest.types("doc"); // 默认情况下,搜索请求会返回文档_source的内容,你可以完全关闭_source检索 // sourceBuilder.fetchSource(false); // String[] includeFields = new String[] {"id", "title", "innerObject.*"}; String[] includeFields = new String[] {"id", "title", "create_time", "path", "user_id","user_name","xmid","xmname"}; String[] excludeFields = new String[] {"data", "attachment.content", "text_html","text_db"}; sourceBuilder.fetchSource(includeFields, excludeFields); // 2-设置查询语句 String queryString = FullTextUtils.acquireSearchQueryJson(queryText, dataSecurity, securityInfo); logger.info(queryString); WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString); sourceBuilder.query(wqb); // 3-设置高亮结果 HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.preTags("").postTags(""); highlightBuilder.field("title").field("text_db").field("attachment.content"); sourceBuilder.highlighter(highlightBuilder); sourceBuilder.from(pageIndex).size(pageSize); // 按创建时间倒序排列 // sourceBuilder.sort("create_time", SortOrder.DESC); SearchRequest searchRequest = new SearchRequest(index); //索引 searchRequest.source(sourceBuilder); SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT); Map result = new HashMap<>(); result.put("pageIndex", pageIndex); result.put("pageSize", pageSize); // JSONArray datas = new JSONArray(); List> datas = new ArrayList<>(); SearchHits hits = response.getHits(); long total = hits.getTotalHits().value; result.put("total", total); if(total > 0){ SearchHit[] searchHits = hits.getHits(); Map one = null; for(SearchHit sh : searchHits){ // one = new JSONObject(sh.getSourceAsMap()); one = sh.getSourceAsMap(); one.put("highlight", FullTextUtils.getOneFieldHighlightText(sh.getHighlightFields())); datas.add(one); } } result.put("datas", datas); return JsonUtils.objectToJsonString(result); } @Override public String searchDataById(String index, String _id) throws Exception{ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); String[] includeFields = new String[] {"id", "title", "create_time", "path", "text_html"}; String[] excludeFields = new String[] {"data", "attachment.content","text_db"}; sourceBuilder.fetchSource(includeFields, excludeFields); // 2-设置查询语句 String queryString = FullTextUtils.acquireSearchOneById(_id); logger.info(queryString); WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString); sourceBuilder.query(wqb); SearchRequest searchRequest = new SearchRequest(index); //索引 searchRequest.source(sourceBuilder); SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); if(hits.getTotalHits().value > 0){ SearchHit hit = hits.getHits()[0]; // JSONObject one = new JSONObject(hit.getSourceAsMap()); Map one = hit.getSourceAsMap(); return JsonUtils.objectToJsonString(one); } return null; } /** * 废弃方法,下载文件还走业务系统方式,这种大文件直接内存拷贝太耗资源 * @param index * @param _id * @return * @throws Exception */ @Deprecated @Override public String searchFileById(String index, String _id) throws Exception{ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); String[] includeFields = new String[] {"id", "title", "create_time" , "data", "attachment.content_length", "attachment.content_type", "attachment.title", "attachment.date"}; String[] excludeFields = new String[] {"attachment.content","text_db"}; sourceBuilder.fetchSource(includeFields, excludeFields); // 2-设置查询语句 String queryString = FullTextUtils.acquireSearchOneById(_id); logger.info(queryString); WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString); sourceBuilder.query(wqb); SearchRequest searchRequest = new SearchRequest(index); //索引 searchRequest.source(sourceBuilder); SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); if(hits.getTotalHits().value > 0){ SearchHit hit = hits.getHits()[0]; Map one = hit.getSourceAsMap(); return JsonUtils.objectToJsonString(one); } return null; } /** * 查询最近搜索关联集合 * @param index * @param keyWord * @return * @throws Exception */ public List searchRecent(String index, String keyWord) throws Exception{ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); String[] includeFields = new String[] {"id", "title", "create_time"}; // String[] excludeFields = new String[] {"data", "attachment.content", "text_html","text_db"}; sourceBuilder.fetchSource(includeFields, null); // 2-设置查询语句 MultiMatchQueryBuilder query = QueryBuilders.multiMatchQuery(keyWord, "title"); // String queryString = FullTextUtils.acquireSearchQueryJson(queryText, dataSecurity, securityInfo); // logger.info(queryString); // WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString); sourceBuilder.query(query); sourceBuilder.from(0).size(10); SearchRequest searchRequest = new SearchRequest(index); //索引 searchRequest.source(sourceBuilder); SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT); List result = new ArrayList<>(); SearchHits hits = response.getHits(); if(hits.getTotalHits().value > 0){ SearchHit[] searchHits = hits.getHits(); for(SearchHit sh : searchHits){ result.add(sh.getSourceAsMap().get("title").toString()); } } return result; } /** * 返回表中某个记录的分词字段数据 * @param index * @param _id 记录id * @return 给定字段分词集合 * @throws Exception */ public List getKeyWordList(String index, String _id, String field) throws Exception{ // TermVectorsRequest request = new TermVectorsRequest(index, Constants.TYPE_DEFAULT, _id); TermVectorsRequest request = new TermVectorsRequest(index, _id); // request.setFields("attachment.content"); request.setFields(field); request.setOffsets(false); request.setPositions(false); request.setPayloads(false); request.setTermStatistics(false); TermVectorsResponse response = restHighClient.termvectors(request, RequestOptions.DEFAULT); List result = new ArrayList<>(); boolean found = response.getFound(); if(found){ for (TermVectorsResponse.TermVector tv : response.getTermVectorsList()) { String fieldname = tv.getFieldName(); //当前字段的名称 logger.info("找到字段下的分词集合:" + fieldname); int docCount = tv.getFieldStatistics().getDocCount(); //当前字段-文档计数的字段统计 long sumTotalTermFreq = tv.getFieldStatistics().getSumTotalTermFreq(); //当前字段的字段统计信息-总术语频率之和 long sumDocFreq = tv.getFieldStatistics().getSumDocFreq(); //当前字段的字段统计信息-文档频率的总和 if (tv.getTerms() != null) { List terms = tv.getTerms(); for (TermVectorsResponse.TermVector.Term term : terms) { String termStr = term.getTerm(); //术语的名称 int termFreq = term.getTermFreq(); //术语的术语频率 result.add(termStr); } } } } return result; } /** * 自动检测附件pipeline,如果不存在则创建,一般名字定义为:attachment * @param pipeline * @return * @throws Exception * @author 时克英 * @date 2020-08-25 */ public boolean autoCreatePipelineAttachment(String pipeline) throws Exception{ GetPipelineRequest request = new GetPipelineRequest(pipeline); GetPipelineResponse response = restHighClient.ingest().getPipeline(request, RequestOptions.DEFAULT); if(response == null){ logger.error("查找pipeline失败:" + pipeline); return false; } boolean existPipeline = false; if(response.isFound()){ for(PipelineConfiguration pc : response.pipelines()){ if(pc.getId().equals(pipeline)){ existPipeline = true; logger.info("找到pipeline:" + pipeline); break; } else { continue; } } } if(!existPipeline){ // 创建pipeline logger.info("不存在pipeline,开始创建:" + pipeline); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("description","解析附件文档"); builder.startArray("processors"); { builder.startObject(); { builder.startObject(pipeline); builder.field("field","data"); builder.field("ignore_missing",true); builder.endObject(); } builder.endObject(); builder.startObject(); { builder.startObject("remove"); builder.field("field","data"); builder.endObject(); } builder.endObject(); } builder.endArray(); } builder.endObject(); logger.info(builder.toString()); BytesReference source = BytesReference.bytes(builder); PutPipelineRequest createRequest = new PutPipelineRequest(pipeline, source, XContentType.JSON); AcknowledgedResponse response1 = restHighClient.ingest().putPipeline(createRequest, RequestOptions.DEFAULT); if(response1 != null && response1.isAcknowledged()){ logger.info("创建pipeline成功:" + pipeline); return true; } else { logger.error("创建pipeline失败:" + pipeline); return false; } } return true; } @Override public void afterPropertiesSet() throws Exception { // System.out.println("++++++" + this.elasticSearchConfig); // this.setNodes(this.elasticSearchConfig.getCluster_Nnodes()); } /** * 初始化service对象,由外部config调用初始化。 * @param nodes * @date 2022-10-20 */ public void setNodes(String nodes){ if(StringUtils.isEmpty(nodes)){ throw new IllegalArgumentException("not found: elasticsearch nodes config"); } String[] nodesInfo = nodes.split(","); HttpHost[] httpHosts = new HttpHost[nodesInfo.length]; int i = 0; for(String oneNode : nodesInfo){ String[] node = oneNode.split(":"); if(node.length != 2){ throw new IllegalArgumentException("elasticsearch nodes config error: " + oneNode); } HttpHost host = new HttpHost(node[0], Integer.parseInt(node[1]), "http"); httpHosts[i] = host; logger.info(oneNode); i++; } // RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setMaxRetryTimeoutMillis(300*1000); RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { // httpAsyncClientBuilder.set return httpAsyncClientBuilder; } }); restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { // 设置连接超时大一点,测试上传大文件 return requestConfigBuilder.setSocketTimeout(600 * 1000) // .setConnectionRequestTimeout(600 * 1000) // 设置写入超时时间 .setConnectTimeout(5000); } }); restHighClient = new RestHighLevelClient(restClientBuilder); logger.info("创建 RestHighLevelClient 完成"); } @Override public void setTokenType(String tokenType) { if(StringUtils.isNotEmpty(tokenType)){ if(!tokenType.equals(Constants.TOKEN_TYPE_MAX) && !tokenType.equals(Constants.TOKEN_TYPE_SMART)){ throw new IllegalArgumentException("tokenType设置错误,只能是:ik_max_word | ik_smart"); } this.tokenType = tokenType; } } }