package com.walker.support.es.impl;
|
|
//import co.elastic.clients.elasticsearch.ElasticsearchClient;
|
|
import com.walker.infrastructure.utils.JsonUtils;
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.support.es.Constants;
|
import com.walker.support.es.SaveData;
|
import com.walker.support.es.SearchService;
|
import com.walker.support.es.util.FullTextUtils;
|
import org.apache.http.HttpHost;
|
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.ingest.GetPipelineRequest;
|
import org.elasticsearch.action.ingest.GetPipelineResponse;
|
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClientBuilder;
|
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.core.TermVectorsRequest;
|
import org.elasticsearch.client.core.TermVectorsResponse;
|
import org.elasticsearch.client.indices.CreateIndexRequest;
|
import org.elasticsearch.client.indices.CreateIndexResponse;
|
import org.elasticsearch.client.indices.GetIndexRequest;
|
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
|
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.WrapperQueryBuilder;
|
import org.elasticsearch.ingest.PipelineConfiguration;
|
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHits;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
|
import org.elasticsearch.xcontent.XContentBuilder;
|
import org.elasticsearch.xcontent.XContentFactory;
|
import org.elasticsearch.xcontent.XContentType;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.InitializingBean;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
/**
|
* 描述:全文检索使用的服务对象
|
* @author 时克英
|
* @date 2020年7月4日 上午11:19:21
|
*/
|
|
public class FullTextSearchService implements SearchService, InitializingBean {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
// @Autowired
|
// private ElasticsearchTemplate elasticsearchTemplate;
|
|
private RestHighLevelClient restHighClient = null;
|
|
// private ElasticsearchClient elasticsearchClient = null;
|
|
private String tokenType = Constants.TOKEN_TYPE_SMART;
|
|
// @Autowired
|
// private ElasticSearchConfig elasticSearchConfig;
|
|
@Override
|
public Object createAttachmentProcessor(String description) {
|
|
logger.warn("暂不支持代码创建附件处理器(Attachment)");
|
return null;
|
}
|
|
@Override
|
public boolean existIndex(String name, String type) throws Exception{
|
GetIndexRequest getIndexRequest = new GetIndexRequest(name);
|
return restHighClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
|
// return elasticsearchClient.execute();
|
}
|
|
@Override
|
public boolean createIndex(String name, String type, String json){
|
// type = Constants.TYPE_DEFAULT;
|
try {
|
boolean existIndex = this.existIndex(name, type);
|
if(existIndex){
|
logger.error("索引存在,无法重复创建:" + name);
|
return false;
|
}
|
} catch (Exception e) {
|
logger.error("查询索引失败:" + name, e);
|
return false;
|
}
|
|
CreateIndexRequest request = new CreateIndexRequest(name);
|
request.settings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0));
|
|
// 设置映射字段
|
try {
|
// XContentBuilder builder = FullTextUtils.createFullTextIndexJson(type);
|
// request.mapping(builder);
|
// String json = FullTextUtils.createIndexJson(type, tokenType);
|
logger.info(json);
|
request.mapping(json, XContentType.JSON);
|
CreateIndexResponse createIndexResponse = restHighClient.indices().create(request, RequestOptions.DEFAULT);
|
if(createIndexResponse != null && createIndexResponse.isAcknowledged()){
|
logger.info("创建索引表成功:" + name + ", " + type);
|
return true;
|
}
|
logger.error("创建索引表失败:" + createIndexResponse.toString());
|
return false;
|
} catch (Exception e) {
|
logger.error("创建索引失败:" + name, e);
|
return false;
|
}
|
}
|
|
// public boolean createIndexFile(String name, String type){
|
// CreateIndexRequest request = new CreateIndexRequest(name);
|
// request.settings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0));
|
//
|
// // 设置映射字段
|
// try {
|
// String json = FullTextUtils.createXmFileJson(type, tokenType);
|
// logger.info(json);
|
// request.mapping(json, XContentType.JSON);
|
// CreateIndexResponse createIndexResponse = restHighClient.indices().create(request, RequestOptions.DEFAULT);
|
// if(createIndexResponse != null && createIndexResponse.isAcknowledged()){
|
// logger.info("创建索引表成功:" + name + ", " + type);
|
// return true;
|
// }
|
// logger.error("创建索引表失败:" + createIndexResponse.toString());
|
// return false;
|
// } catch (Exception e) {
|
// logger.error("创建索引失败:" + name, e);
|
// return false;
|
// }
|
// }
|
|
@Override
|
public void removeIndex(String index) throws Exception{
|
// boolean existIndex = this.existIndex(index, Constants.TYPE_DEFAULT);
|
// if(existIndex){
|
// DeleteIndexRequest request = new DeleteIndexRequest(index);
|
// restHighClient.indices().delete(request, RequestOptions.DEFAULT);
|
// logger.warn("删除索引库成功:" + index);
|
// }
|
DeleteIndexRequest request = new DeleteIndexRequest(index);
|
restHighClient.indices().delete(request, RequestOptions.DEFAULT);
|
logger.warn("删除索引库成功:" + index);
|
}
|
|
/**
|
* type表类型在7.0之后会被废弃,但6.0还必须使用,后续需要微调,去掉type
|
* @date 2020-07-15
|
*/
|
@Override
|
public void putContent(String index, List<SaveData> datas) throws Exception{
|
if(datas == null || datas.size() == 0){
|
return;
|
}
|
|
BulkRequest request = new BulkRequest();
|
IndexRequest indexRequest = null;
|
for(SaveData d : datas){
|
indexRequest = new IndexRequest(index, Constants.TYPE_DEFAULT);
|
indexRequest.setPipeline(Constants.PIPELINE_DEFAULT);
|
// indexRequest.opType("create").id(d.getId()).source(FullTextUtils.translate(d));
|
// indexRequest.opType(DocWriteRequest.OpType.CREATE);
|
// indexRequest.version(2);//设置版本
|
// indexRequest.versionType(VersionType.EXTERNAL);//设置版本类型
|
// 这里设置es中的id,可以避免重复写入数据
|
indexRequest.id(d.getId());
|
indexRequest.source(FullTextUtils.translate(d));
|
request.add(indexRequest);
|
logger.info("saveData = " + d.getTitle() + ", " + d.getShowDataSummary());
|
}
|
BulkResponse bulkResponse = restHighClient.bulk(request, RequestOptions.DEFAULT);
|
if(bulkResponse.hasFailures()){
|
throw new Exception("(更新过)批量写入es数据出现错误: " + bulkResponse.buildFailureMessage());
|
}
|
logger.info("批量写入es数据成功");
|
}
|
|
@Override
|
public String search(String index, String queryText, int dataSecurity, String securityInfo
|
, int pageIndex, int pageSize) throws Exception{
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
// 1-基本参数
|
//将请求限制为一个类型。
|
// searchRequest.types("doc");
|
// 默认情况下,搜索请求会返回文档_source的内容,你可以完全关闭_source检索
|
// sourceBuilder.fetchSource(false);
|
// String[] includeFields = new String[] {"id", "title", "innerObject.*"};
|
String[] includeFields = new String[] {"id", "title", "create_time", "path", "user_id","user_name","xmid","xmname"};
|
String[] excludeFields = new String[] {"data", "attachment.content", "text_html","text_db"};
|
sourceBuilder.fetchSource(includeFields, excludeFields);
|
|
// 2-设置查询语句
|
String queryString = FullTextUtils.acquireSearchQueryJson(queryText, dataSecurity, securityInfo);
|
logger.info(queryString);
|
WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString);
|
sourceBuilder.query(wqb);
|
|
// 3-设置高亮结果
|
HighlightBuilder highlightBuilder = new HighlightBuilder();
|
highlightBuilder.preTags("<em class=\"red\">").postTags("</em>");
|
highlightBuilder.field("title").field("text_db").field("attachment.content");
|
sourceBuilder.highlighter(highlightBuilder);
|
|
sourceBuilder.from(pageIndex).size(pageSize);
|
|
// 按创建时间倒序排列
|
// sourceBuilder.sort("create_time", SortOrder.DESC);
|
|
SearchRequest searchRequest = new SearchRequest(index); //索引
|
searchRequest.source(sourceBuilder);
|
SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
Map<String, Object> result = new HashMap<>();
|
result.put("pageIndex", pageIndex);
|
result.put("pageSize", pageSize);
|
// JSONArray datas = new JSONArray();
|
List<Map<String, Object>> datas = new ArrayList<>();
|
|
SearchHits hits = response.getHits();
|
long total = hits.getTotalHits().value;
|
result.put("total", total);
|
if(total > 0){
|
SearchHit[] searchHits = hits.getHits();
|
Map<String, Object> one = null;
|
for(SearchHit sh : searchHits){
|
// one = new JSONObject(sh.getSourceAsMap());
|
one = sh.getSourceAsMap();
|
one.put("highlight", FullTextUtils.getOneFieldHighlightText(sh.getHighlightFields()));
|
datas.add(one);
|
}
|
}
|
result.put("datas", datas);
|
return JsonUtils.objectToJsonString(result);
|
}
|
|
@Override
|
public String searchDataById(String index, String _id) throws Exception{
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
String[] includeFields = new String[] {"id", "title", "create_time", "path", "text_html"};
|
String[] excludeFields = new String[] {"data", "attachment.content","text_db"};
|
sourceBuilder.fetchSource(includeFields, excludeFields);
|
|
// 2-设置查询语句
|
String queryString = FullTextUtils.acquireSearchOneById(_id);
|
logger.info(queryString);
|
WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString);
|
sourceBuilder.query(wqb);
|
|
SearchRequest searchRequest = new SearchRequest(index); //索引
|
searchRequest.source(sourceBuilder);
|
SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
SearchHits hits = response.getHits();
|
if(hits.getTotalHits().value > 0){
|
SearchHit hit = hits.getHits()[0];
|
// JSONObject one = new JSONObject(hit.getSourceAsMap());
|
Map<String, Object> one = hit.getSourceAsMap();
|
return JsonUtils.objectToJsonString(one);
|
}
|
return null;
|
}
|
|
/**
|
* 废弃方法,下载文件还走业务系统方式,这种大文件直接内存拷贝太耗资源
|
* @param index
|
* @param _id
|
* @return
|
* @throws Exception
|
*/
|
@Deprecated
|
@Override
|
public String searchFileById(String index, String _id) throws Exception{
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
String[] includeFields = new String[] {"id", "title", "create_time"
|
, "data", "attachment.content_length", "attachment.content_type", "attachment.title", "attachment.date"};
|
String[] excludeFields = new String[] {"attachment.content","text_db"};
|
sourceBuilder.fetchSource(includeFields, excludeFields);
|
|
// 2-设置查询语句
|
String queryString = FullTextUtils.acquireSearchOneById(_id);
|
logger.info(queryString);
|
WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString);
|
sourceBuilder.query(wqb);
|
|
SearchRequest searchRequest = new SearchRequest(index); //索引
|
searchRequest.source(sourceBuilder);
|
SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
SearchHits hits = response.getHits();
|
if(hits.getTotalHits().value > 0){
|
SearchHit hit = hits.getHits()[0];
|
Map<String, Object> one = hit.getSourceAsMap();
|
return JsonUtils.objectToJsonString(one);
|
}
|
return null;
|
}
|
|
/**
|
* 查询最近搜索关联集合
|
* @param index
|
* @param keyWord
|
* @return
|
* @throws Exception
|
*/
|
public List<String> searchRecent(String index, String keyWord) throws Exception{
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
String[] includeFields = new String[] {"id", "title", "create_time"};
|
// String[] excludeFields = new String[] {"data", "attachment.content", "text_html","text_db"};
|
sourceBuilder.fetchSource(includeFields, null);
|
|
// 2-设置查询语句
|
MultiMatchQueryBuilder query = QueryBuilders.multiMatchQuery(keyWord, "title");
|
// String queryString = FullTextUtils.acquireSearchQueryJson(queryText, dataSecurity, securityInfo);
|
// logger.info(queryString);
|
// WrapperQueryBuilder wqb = QueryBuilders.wrapperQuery(queryString);
|
sourceBuilder.query(query);
|
|
sourceBuilder.from(0).size(10);
|
|
SearchRequest searchRequest = new SearchRequest(index); //索引
|
searchRequest.source(sourceBuilder);
|
SearchResponse response = restHighClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
List<String> result = new ArrayList<>();
|
SearchHits hits = response.getHits();
|
if(hits.getTotalHits().value > 0){
|
SearchHit[] searchHits = hits.getHits();
|
for(SearchHit sh : searchHits){
|
result.add(sh.getSourceAsMap().get("title").toString());
|
}
|
}
|
return result;
|
}
|
|
/**
|
* 返回表中某个记录的分词字段数据
|
* @param index
|
* @param _id 记录id
|
* @return 给定字段分词集合
|
* @throws Exception
|
*/
|
public List<String> getKeyWordList(String index, String _id, String field) throws Exception{
|
// TermVectorsRequest request = new TermVectorsRequest(index, Constants.TYPE_DEFAULT, _id);
|
TermVectorsRequest request = new TermVectorsRequest(index, _id);
|
// request.setFields("attachment.content");
|
request.setFields(field);
|
request.setOffsets(false);
|
request.setPositions(false);
|
request.setPayloads(false);
|
request.setTermStatistics(false);
|
TermVectorsResponse response = restHighClient.termvectors(request, RequestOptions.DEFAULT);
|
|
List<String> result = new ArrayList<>();
|
|
boolean found = response.getFound();
|
if(found){
|
for (TermVectorsResponse.TermVector tv : response.getTermVectorsList()) {
|
String fieldname = tv.getFieldName(); //当前字段的名称
|
logger.info("找到字段下的分词集合:" + fieldname);
|
int docCount = tv.getFieldStatistics().getDocCount(); //当前字段-文档计数的字段统计
|
long sumTotalTermFreq =
|
tv.getFieldStatistics().getSumTotalTermFreq(); //当前字段的字段统计信息-总术语频率之和
|
long sumDocFreq = tv.getFieldStatistics().getSumDocFreq(); //当前字段的字段统计信息-文档频率的总和
|
if (tv.getTerms() != null) {
|
List<TermVectorsResponse.TermVector.Term> terms = tv.getTerms();
|
for (TermVectorsResponse.TermVector.Term term : terms) {
|
String termStr = term.getTerm(); //术语的名称
|
int termFreq = term.getTermFreq(); //术语的术语频率
|
result.add(termStr);
|
}
|
}
|
}
|
}
|
return result;
|
}
|
|
/**
|
* 自动检测附件pipeline,如果不存在则创建,一般名字定义为:attachment
|
* @param pipeline
|
* @return
|
* @throws Exception
|
* @author 时克英
|
* @date 2020-08-25
|
*/
|
public boolean autoCreatePipelineAttachment(String pipeline) throws Exception{
|
GetPipelineRequest request = new GetPipelineRequest(pipeline);
|
GetPipelineResponse response = restHighClient.ingest().getPipeline(request, RequestOptions.DEFAULT);
|
if(response == null){
|
logger.error("查找pipeline失败:" + pipeline);
|
return false;
|
}
|
|
boolean existPipeline = false;
|
if(response.isFound()){
|
for(PipelineConfiguration pc : response.pipelines()){
|
if(pc.getId().equals(pipeline)){
|
existPipeline = true;
|
logger.info("找到pipeline:" + pipeline);
|
break;
|
} else {
|
continue;
|
}
|
}
|
}
|
|
if(!existPipeline){
|
// 创建pipeline
|
logger.info("不存在pipeline,开始创建:" + pipeline);
|
|
XContentBuilder builder = XContentFactory.jsonBuilder();
|
builder.startObject();
|
{
|
builder.field("description","解析附件文档");
|
builder.startArray("processors");
|
{
|
builder.startObject();
|
{
|
builder.startObject(pipeline);
|
builder.field("field","data");
|
builder.field("ignore_missing",true);
|
builder.endObject();
|
}
|
builder.endObject();
|
|
builder.startObject();
|
{
|
builder.startObject("remove");
|
builder.field("field","data");
|
builder.endObject();
|
}
|
builder.endObject();
|
}
|
builder.endArray();
|
}
|
builder.endObject();
|
logger.info(builder.toString());
|
|
BytesReference source = BytesReference.bytes(builder);
|
PutPipelineRequest createRequest = new PutPipelineRequest(pipeline, source, XContentType.JSON);
|
AcknowledgedResponse response1 = restHighClient.ingest().putPipeline(createRequest, RequestOptions.DEFAULT);
|
if(response1 != null && response1.isAcknowledged()){
|
logger.info("创建pipeline成功:" + pipeline);
|
return true;
|
} else {
|
logger.error("创建pipeline失败:" + pipeline);
|
return false;
|
}
|
}
|
return true;
|
}
|
|
@Override
|
public void afterPropertiesSet() throws Exception {
|
// System.out.println("++++++" + this.elasticSearchConfig);
|
// this.setNodes(this.elasticSearchConfig.getCluster_Nnodes());
|
}
|
|
/**
|
* 初始化service对象,由外部config调用初始化。
|
* @param nodes
|
* @date 2022-10-20
|
*/
|
public void setNodes(String nodes){
|
if(StringUtils.isEmpty(nodes)){
|
throw new IllegalArgumentException("not found: elasticsearch nodes config");
|
}
|
|
String[] nodesInfo = nodes.split(",");
|
HttpHost[] httpHosts = new HttpHost[nodesInfo.length];
|
|
int i = 0;
|
for(String oneNode : nodesInfo){
|
String[] node = oneNode.split(":");
|
if(node.length != 2){
|
throw new IllegalArgumentException("elasticsearch nodes config error: " + oneNode);
|
}
|
HttpHost host = new HttpHost(node[0], Integer.parseInt(node[1]), "http");
|
httpHosts[i] = host;
|
logger.info(oneNode);
|
i++;
|
}
|
|
// RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setMaxRetryTimeoutMillis(300*1000);
|
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
|
@Override
|
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
|
// httpAsyncClientBuilder.set
|
return httpAsyncClientBuilder;
|
}
|
});
|
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
|
|
@Override
|
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
|
// 设置连接超时大一点,测试上传大文件
|
return requestConfigBuilder.setSocketTimeout(600 * 1000) //
|
.setConnectionRequestTimeout(600 * 1000) // 设置写入超时时间
|
.setConnectTimeout(5000);
|
}
|
});
|
|
|
restHighClient = new RestHighLevelClient(restClientBuilder);
|
logger.info("创建 RestHighLevelClient 完成");
|
}
|
|
@Override
|
public void setTokenType(String tokenType) {
|
if(StringUtils.isNotEmpty(tokenType)){
|
if(!tokenType.equals(Constants.TOKEN_TYPE_MAX) && !tokenType.equals(Constants.TOKEN_TYPE_SMART)){
|
throw new IllegalArgumentException("tokenType设置错误,只能是:ik_max_word | ik_smart");
|
}
|
this.tokenType = tokenType;
|
}
|
}
|
}
|