ES在大数据中的应用
一、相关软件介绍
1。Elasticsearch
Elasticsearch,简称为ES,是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。无论是结构化或非结构化文本、数字数据还是地理空间数据,Elasticsearch都能以支持快速搜索的方式高效地存储和索引它。
本文在linux环境使用Elasticsearch7。14。0版本进行配置及操作。
2。Kibana(辅助工具)
Kibana是一个开源的分析与可视化平台,可实现以交互方式探索、可视化和分享对数据的见解,并管理和监控堆栈。它和Elasticsearch一起使用,用于搜索、查看存放在Elasticsearch中的数据。
3。elasticsearchhead(辅助工具)
elasticsearchhead是用于监控Elasticsearch状态的客户端插件,包括数据可视化、执行增删改查操作等。
4。elasticsearchpy
elasticsearchpy是官方提供的Elasticsearchpython客户端库,它只是对Elasticsearch的restAPI接口做了一层简单的封装。二、环境搭建
1。Elasticsearch部署
(1)下载
elasticsearch7140
(2)解压
tarzxvfelasticsearch7。14。0linuxx8664。tar。gz
(3)安装ES不用使用root用户
创建普通用户work。
(4)修改配置文件configelasticsearch。yml
配置项如下:
cluster。name:escluster
node。name:esnode
node。master:true
node。data:true
node。maxlocalstoragenodes:1
path。data:dataelasticsearchdata
path。logs:dataelasticsearchlogs
network。host:0。0。0。0
http。port:9200
transport。tcp。port:9300
transport。tcp。compress:true
discovery。seedhosts:〔192。168。1。90:9300〕
cluster。initialmasternodes:〔esnode〕
http。cors。enabled:true
http。cors。alloworigin:
(5)启动ES
。binelasticsearch
elasticsearch启动成功样例图
后台方式启动ES:
。binelasticsearchd
(6)可能出现的错误及解决方法
错误1:
maxfiledescriptors〔4096〕forelasticsearchprocessistoolow,increasetoatleast〔65535〕
解决方法:
sudovimetcsecuritylimits。conf
追加以下内容:
softnofile65536
hardnofile65536
softnproc4096
hardnproc4096
错误2:
maxnumberofthreads〔3802〕foruser〔work〕istoolow,increasetoatleast〔4096〕
解决方法:
sudovimetcsecuritylimits。d20nproc。conf
修改为:
softnproc4096
错误3:
maxvirtualmemoryareasvm。maxmapcount〔65530〕istoolow,increasetoatleast〔262144〕
解决方法:
sudovimetcsysctl。conf
修改为:
vm。maxmapcount262144
执行以下命令生效:
sysctlp
2。Kibana
(1)下载
kibana7140
(2)解压
tarzxvfkibana7。14。0linuxx8664。tar。gz
(3)修改配置文件configkibana。yml
配置项如下:
server。host:192。168。1。90
server。shutdownTimeout:5s
elasticsearch。hosts:〔http:192。168。1。90:9200〕
monitoring。ui。container。elasticsearch。enabled:true
server。port:5601
kibana。index:。kibana
i18n。locale:zhCN
(4)启动kibana
。binkibana
kibana启动成功样例图
(5)浏览器打开kibana开发工具
kibana开发工具界面图
3。elasticsearchhead
(1)此处提供编译好的版本,解压后运行npmrunstart即可启动
链接:
https:pan。baidu。coms19F8zMdN94QhgqYP9vkPHHA
提取码:a99y
(2)自行下载编译
cdelasticsearchhead
npminstall
npmrunstart
elasticsearchhead启动成功样例图
(3)浏览器打开elasticsearchhead,查看ES状态
elasticsearchhead界面图
集群健康值
green:所有主要分片和复制分片都可用
yellow:所有主要分片可用,但不是所有复制分片都可用
red:不是所有的主要分片都可用
当集群状态为red,它仍然正常提供服务,它会在现有存活分片中执行请求,此时需要尽快修复故障分片,防止查询数据的丢失。
4。elasticsearchpy
pipinstallelasticsearch7。14。0三、核心概念
索引(Index)
索引就是一类文档的集合,类似于关系型数据库中的表。索引由其名称进行标识,每个索引名称必须是小写。
文档(Document)
Index中单条记录称为文档,等同于关系型数据库表中的行。
字段(Field)
json结构的字段,等同于关系型数据库表中的列。
映射(Mapping)
Mapping是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、分析器、是否被索引等等,都是映射里可以设置的。
分片(Shards)
一个索引可以存储超过单个节点硬件限制的大量数据,相当于分表的概念。ES提供了将索引划分成多份的能力,每一份称之为分片。当创建一个索引的时候,可以指定想要的分片数量。允许水平分割扩展内容容量;允许在分片之上进行分布式的、并行的操作,进而提高性能吞吐量。
副本(Replicas)
在分片节点失败的情况下,提供了高可用性。复制分片从不与原主要分片置于同一节点上是非常重要的。扩展搜索量吞吐量,因为搜索可以在所有副本上并行运行。四、python操作ES
1。连接ES
fromelasticsearchimportElasticsearch
defmain():
连接ES
esElasticsearch(〔192。168。1。90:9200〕,
sniffonstartTrue,连接前测试
sniffonconnectionfailTrue,节点无响应时刷新节点
sniffertimeout60设置超时时间)
ifnamemain:
main()
2。增
创建索引
定义mappingbody
bodyindex{
mappings:{
properties:{
name:{type:keyword},
age:{type:long},
tags:{type:text}
}},
settings:{
index:{
numberofshards:3,
numberofreplicas:0
}}}
创建index
reses。indices。create(indexindexname,bodybodyindex,ignore400)
插入单个数据
person1{
name:张三,
age:18,
tags:勤奋学习十载寒窗,凿壁借光,囊萤映雪,手不释卷,有良好的表达能力。有耐心心态好,善于维系客户关系。果断热情勇敢孤僻活力,思想成熟能够独立工作。
}
reses。index(indexindexname,bodyperson1)
批量插入数据
fromelasticsearchimporthelpers
insertinfos〔〕
person2{
index:indexname,
name:李四,
age:20,
tags:有极强的领导艺术,公正严明铁面无私,公私分明。关心他人无微不至,体贴入微。精力充沛,并有很强的事业心。气吞山河正气凛然,善于同各种人员打交道。
}
person3{
index:indexname,
name:王五,
age:19,
tags:尊敬师长团结同学,乐于助人学习勤奋,用心向上,用心参加班级学校组织的各种课内外活动。用心开展批评与自我批评。
}
insertinfos。append(person2)
insertinfos。append(person3)
helpers。bulk(clientes,actionsinsertinfos)
elasticsearchhead数据浏览界面
3。删
删除索引
删除index
reses。indices。delete(indexindexname,ignore〔400〕)
按id删除文档
按id删除
reses。delete(indexindexname,idbKTgXYUBfH4USN9RFMOh)
按条件删除文档
按条件删除
body{
query:{
match:{
name:张三
}}}
reses。deletebyquery(indexindexname,bodybody,ignore〔400,404〕)
4。改
index
body{
name:王五,
age:19,
tags:尊敬师长团结同学,乐于助人学习勤奋,用心向上,用心参加班级学校组织的各种课内外活动。用心开展批评与自我批评。
}
reses。index(indexindexname,idbaTgXYUBfH4USN9RFMOh,bodybody)
index()方法完成两个操作,如果数据不存在,那就执行插入操作,如果已经存在,那就执行更新操作。
index实现更新时,body中必须写入全部字段,否则未包含的字段会被置为空。
update
body{
doc:{
name:王五
}}
es。update(indexindexname,idbaTgXYUBfH4USN9RFMOh,bodybody)
5。查
查看es中的索引
indexinfoes。indices。get()
查看索引的名称
indexnamesindexinfo。keys()
判断索引是否存在
indexnameesindex
print(es。indices。exists(indexname))
查询文档数量
doccountes。count(indexindexname)
按id查询
body{
query:{
match:{
id:baTgXYUBfH4USN9RFMOh
}}}
reses。search(indexindexname,bodybody)
按属性查询,结果过滤返回指定字段
body{
query:{
match:{
age:20
}},
source:〔name,tags〕
}
reses。search(indexindexname,bodybody)
按年龄排序
body{
sort:{
age:{
order:descasc:升序,desc:降序
}}}
reses。search(indexindexname,bodybody)
查询年龄大于18且小于等于20的文档
body{
query:{
range:{
age:{
gt:18,
lte:20
}}}}
reses。search(indexindexname,bodybody)
按年龄降序且分页查询
body{
sort:{
age:{
order:descasc:升序,desc:降序
}},
from:0,
size:1
}
reses。search(indexindexname,bodybody)
精准查询
body{
query:{
matchphrase:{
tags:耐心
}}}
reses。search(indexindexname,bodybody)
布尔查询:姓名为张三且tags中包含耐心
body{
query:{
bool:{
must:〔{
match:{
name:张三
}},
{
matchphrase:{
tags:耐心
}}〕}}}
reses。search(indexindexname,bodybody)
布尔查询:姓名为王五且tags中不包含耐心
body{
query:{
bool:{
must:〔{
match:{
name:王五
}}〕,
mustnot:〔{
matchphrase:{
tags:耐心
}}〕}}}
reses。search(indexindexname,bodybody)五、DSL语句
QueryDSL是一个Java开源框架用于构建类型安全的SQL查询语句。在查询时,通常先在Kibana中使用DSL验证查询语句的正确性,再转到python中使用。
查询所有索引
DSL执行界面图
添加文档:id设为1
PUTesindexdoc1
{
name:赵六
}
删除文档:id1
DELETEesindexdoc1
查询
GETesindexsearch
{
query:{
match:{
age:18
}}}
先验证结果正确,将GETesindexsearch后{}的内容转到python的body中即可。
智驱力科技驱动生产力
智驱力科技驱动生产力