ElasticSearch里我明明指定了long,为什么却变成了keyword
背景
实体类定义属性id为Long类型,但在调用 spring-data-elasticsearch:3.2.10.RELEASE中的putMapping(Class)方法时却被转换成了keyword类型源码
查看putMapping方法,可以发现最终调用最下边的重载方法class ElasticsearchRestTemplate { ... @Override public boolean putMapping(Class clazz) { return putMapping(clazz, buildMapping(clazz)); } @Override public boolean putMapping(Class clazz, Object mapping) { return putMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType(), mapping); } @Override public boolean putMapping(String indexName, String type, Class clazz) { return putMapping(indexName, type, buildMapping(clazz)); } @Override public boolean putMapping(String indexName, String type, Object mapping) { Assert.notNull(indexName, "No index defined for putMapping()"); Assert.notNull(type, "No type defined for putMapping()"); PutMappingRequest request = new PutMappingRequest(indexName).type(type); if (mapping instanceof String) { request.source(String.valueOf(mapping), XContentType.JSON); } else if (mapping instanceof Map) { request.source((Map) mapping); } else if (mapping instanceof XContentBuilder) { request.source((XContentBuilder) mapping); } try { return client.indices().putMapping(request, RequestOptions.DEFAULT).isAcknowledged(); } catch (IOException e) { throw new ElasticsearchException("Failed to put mapping for " + indexName, e); } } ... } 复制代码
查看buildMapping方法,因为并没有定义外部mappingPath配置文件,所以走最下边的mappingBuilder.buildPropertyMapping(clazz)来进行解析出String类型的json文件abstract class AbstractElasticsearchTemplate { ... protected String buildMapping(Class<?> clazz) { // load mapping specified in Mapping annotation if present if (clazz.isAnnotationPresent(Mapping.class)) { String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath(); if (!StringUtils.isEmpty(mappingPath)) { String mappings = ResourceUtil.readFileFromClasspath(mappingPath); if (!StringUtils.isEmpty(mappings)) { return mappings; } } else { LOGGER.info("mappingPath in @Mapping has to be defined. Building mappings using @Field"); } } // build mapping from field annotations try { MappingBuilder mappingBuilder = new MappingBuilder(elasticsearchConverter); return mappingBuilder.buildPropertyMapping(clazz); } catch (Exception e) { throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e); } } ... } 复制代码
查看buildPropertyMapping方法class MappingBuilder { ... String buildPropertyMapping(Class<?> clazz) throws IOException { // 提前解析出一些通用属性,比如indexName,indexType等等 ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext() .getRequiredPersistentEntity(clazz); // 构造一个json构造器,以indexType开始 XContentBuilder builder = jsonBuilder().startObject().startObject(entity.getIndexType()); // 添加dynamic template addDynamicTemplatesMapping(builder, entity); // 父子文档判断 String parentType = entity.getParentType(); if (hasText(parentType)) { builder.startObject(FIELD_PARENT).field(FIELD_TYPE, parentType).endObject(); } // 属性解析开始标志properties builder.startObject(FIELD_PROPERTIES); // 具体的properties解析,为根对象非nested对象 mapEntity(builder, entity, true, "", false, FieldType.Auto, null); builder.endObject() // FIELD_PROPERTIES .endObject() // indexType .endObject() // root object .close(); return builder.getOutputStream().toString(); } private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity entity, boolean isRootObject, String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType, @Nullable Field parentFieldAnnotation) throws IOException { boolean writeNestedProperties = !isRootObject && (isAnyPropertyAnnotatedWithField(entity) || nestedOrObjectField); if (writeNestedProperties) { String type = nestedOrObjectField ? fieldType.toString().toLowerCase() : FieldType.Object.toString().toLowerCase(); builder.startObject(nestedObjectFieldName).field(FIELD_TYPE, type); if (nestedOrObjectField && FieldType.Nested == fieldType && parentFieldAnnotation != null && parentFieldAnnotation.includeInParent()) { builder.field("include_in_parent", parentFieldAnnotation.includeInParent()); } builder.startObject(FIELD_PROPERTIES); } // 对象字段属性的解析 if (entity != null) { entity.doWithProperties((PropertyHandler) property -> { try { if (property.isAnnotationPresent(Transient.class) || isInIgnoreFields(property, parentFieldAnnotation)) { return; } buildPropertyMapping(builder, isRootObject, property); } catch (IOException e) { logger.warn("error mapping property with name {}", property.getName(), e); } }); } if (writeNestedProperties) { builder.endObject().endObject(); } } // 解析每个property的方法 private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject, ElasticsearchPersistentProperty property) throws IOException { if (property.isAnnotationPresent(Mapping.class)) { String mappingPath = property.getRequiredAnnotation(Mapping.class).mappingPath(); if (!StringUtils.isEmpty(mappingPath)) { ClassPathResource mappings = new ClassPathResource(mappingPath); if (mappings.exists()) { builder.rawField(property.getFieldName(), mappings.getInputStream(), XContentType.JSON); return; } } } // geo标识 boolean isGeoPointProperty = isGeoPointProperty(property); // completion标识 boolean isCompletionProperty = isCompletionProperty(property); // nested object标识 boolean isNestedOrObjectProperty = isNestedOrObjectProperty(property); // 属性上的Field注解 Field fieldAnnotation = property.findAnnotation(Field.class); if (!isGeoPointProperty && !isCompletionProperty && property.isEntity() && hasRelevantAnnotation(property)) { if (fieldAnnotation == null) { return; } Iterator<? extends TypeInformation<?>> iterator = property.getPersistentEntityTypes().iterator(); ElasticsearchPersistentEntity<?> persistentEntity = iterator.hasNext() ? elasticsearchConverter.getMappingContext().getPersistentEntity(iterator.next()) : null; mapEntity(builder, persistentEntity, false, property.getFieldName(), isNestedOrObjectProperty, fieldAnnotation.type(), fieldAnnotation); if (isNestedOrObjectProperty) { return; } } MultiField multiField = property.findAnnotation(MultiField.class); if (isGeoPointProperty) { applyGeoPointFieldMapping(builder, property); return; } if (isCompletionProperty) { CompletionField completionField = property.findAnnotation(CompletionField.class); applyCompletionFieldMapping(builder, property, completionField); } // 判断是否为id属性 if (isRootObject && fieldAnnotation != null && property.isIdProperty()) { applyDefaultIdFieldMapping(builder, property); } else if (multiField != null) { addMultiFieldMapping(builder, property, multiField, isNestedOrObjectProperty); } else if (fieldAnnotation != null) { addSingleFieldMapping(builder, property, fieldAnnotation, isNestedOrObjectProperty); } } ... } 复制代码
至此可以看到,只要fieldName为id或document就判定为是id属性,然后将type设置为keyword并且可被索引。疑问到这里解决class SimpleElasticsearchPersistentProperty { ... private static final List SUPPORTED_ID_PROPERTY_NAMES = Arrays.asList("id", "document"); public SimpleElasticsearchPersistentProperty(Property property, PersistentEntity<?, ElasticsearchPersistentProperty> owner, SimpleTypeHolder simpleTypeHolder) { ... this.isId = super.isIdProperty() || SUPPORTED_ID_PROPERTY_NAMES.contains(getFieldName()); ... } @Override public boolean isIdProperty() { return isId; } private void applyDefaultIdFieldMapping(XContentBuilder builder, ElasticsearchPersistentProperty property) throws IOException { builder.startObject(property.getFieldName()).field(FIELD_TYPE, TYPE_VALUE_KEYWORD).field(FIELD_INDEX, true) .endObject(); } ... } 复制代码话外题
项目中使用的ElasticSearch实体类都是采取@Document指定indexName来操作的,但是索引和表都涉及到分库分表,所以又不能写死,然后就采取的SpEL配合ThreadLocal从上下文里set后get,其实Spring对elasticsearch操作类似于关系型数据库也封装的有一层Repository抽象,名为ElasticsearchRepository,我们可以直接定义实体类操作接口继承就可以完成对单索引的CRUD以及Page等操作,但这样有一个问题,那就是indexName无法动态去调整,所以就放弃了这种,改用更底层的RestHighLevelClient封装的ElasticSearchRestTemplate模版类,这样在面对分库分表时就可以手动去对每个Document进行set不同的indexName,跨索引查询时也可以指定多个,也可以直接指定索引的alias,需要注意的时,在进行更新时,只指定alias是不被允许的,需要手动查出符合条件的Document在进行索引的分组批量更新,即调用ElasticSearch的bulk api
在对ElasticSearch和数据库的一致性问题上,我是通过封装不同的方法来确保强一致性和最终一致性强一致性
类似插入、更新、删除等场景下,都是放在一个事务里,先操作数据库,再操作ElasticSearch,这样可以确保操作ElasticSearch失败时,数据库可以成功回滚。一般只运用于对数据实时性要求敏感的场景,并且数据量不大的情况,但即便这样还是会有至少1s的延迟,这里就涉及到ElasticSearch的刷盘策略问题上了,这里不展开研究最终一致性
批量的插入、更新这些操作,如果放在一个大事务里,对数据库也是一种压力,所以一般是分批操作数据库,另起一个线程池对事务提交进行监听,将数据库数据同步到ElasticSearch里,在同步成功后反转数据库的同步状态字段。为了确保万无一失,后台会启动一个定时扫描数据库同步字段的线程去定时扫描同步。这种一般适用于大数据量的场景。当然你也可以去监听MySQL的binlog日志来进行同步。