ElasticSearch搜索引擎存储

想查数据,就免不了搜索,而搜索离不开搜索引擎。百度、谷歌都是非常庞大、复杂的搜索引擎,它们几乎能够索引互联网上开放的所有网页和数据。然而对于我们自己的业务数据来说,没必要使用这么复杂的技术。如果为了便于存储和检索,想要实现自己的搜索引擎,那么 Elasticsearch 就是不二之选。这是—个全文搜索引擎,可以快速存储、搜索和分析海量数据。

所以,如果我们将爬取到的数据存储到 Elasticsearch 里面,检索时会非常方便。

ElasticSearch介绍

Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 ApacheLuceneTM 的基础之上。

那 Lucene 文是什么呢?Lucene 可能是自前存在的(不论开源还是私有)拥有最先进、高性能和全功能搜索引擎功能的库,但也仅仅只是一个库。要想用 Lucene,我们需要编写 Java 并引用 Lucene 包才可以,而且需要我们对信息检索有一定程度的理解。

为了解决这个问题,Elasticsearch 诞生了。Elasticsearch 也是使用 Java 编写的,其内部使用 Lucene 实现索引与搜索,但是它的目标是使全文检索变得简单,相当于 Lucene 的一层封装,它提供了一套简单一致的 RESTful API 来帮助我们实现存储和检索。

所以 Elasticsearch 仅仅就是一个简易版的 Lucene 封装吗?如果这么认为,那就大错特错了,Elasticsearch 不仅是 Lucene,并且也不只是一个全文搜索引擎。它可以这样准确形容:

  • 一个分布式的实时文档存储库,每个字段都可以被索引与搜索;

  • 一个分布式的实时分析搜索引擎:

  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据。

总之,Elasticsearch 是一个非常强大的搜索引擎,维基百科、StackOverflow、GitHub 都纷纷采用它来实现搜索。Elasticsearch 不仅提供强大的检索能力,也提供强大的存储能力。

ElasticSearch相关概念

Elasticsearch 中有几个基本概念,如节点、索引、文档等,下面分别说明一下。理解这些概念,对熟悉 Elasticsearch 是非常有帮助的。

节点和集群

Elasticsearch 本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器均可以运行多个 Elasticsearch 实例。

单个 Elasticsearch 实例称为一个节点(Node),一组节点构成一个集群(Cluster)。

索引

索引即 index,Elasticsearch 会索引所有字段,经过处理后写人一个反向索引(inverted index)。查找数据的时候,直接查找该索引。所以,Elasticsearch 数据管理的顶层单位就叫作索引,其实相当于 MySQL、MongoDB 等中数据库的概念。另外,值得注意的是,每个索引(即数据库)的名字必须小写。

文档

索引里的单条记录称为文档(document),许多条文档构成一个索引。

对同一个索引里面的文档,不要求有相同的结构(scheme),但是结构最好保持一致,因为这样有利于提高搜索效率。

类型

文档可以分组,例如 weather 这个索引里的文档,既可以按城市分组(北京和上海),也可以按气候分组(晴天和雨天)。这种分组就叫作类型(Type),它是虚拟的逻辑分组,用来过滤文档,类似 MySQL 中的数据表、MongoDB 中的集合。

不同类型的文档应该具有相似的结构。举例来说,id 字段不能在这个组中是字符串,在另一个组中却变成了数值。这点与关系型数据库的表是不同的。应该把性质完全不同的数据(例如 productslogs)存成两个索引,而不是把两个类型的数据存在一个索引里面(虽然可以做到)。

根据规划,Elastic 6.x 版只允许每个索引包含一个类型,Elastic 7.x 版将会开始移除类型。

字段

每个文档都类似一个 JSON 结构,包含许多字段,每个字段都有其对应的值,多个字段组成了一个文档,其实可以类比为 MySQL 数据表中的字段。

在 Elasticsearch 中,文档归属于一种类型(Type),而这些类型存在于索引中。我们可以画一个简单的对比图来类比 Elasticsearch 与传统的关系型数据库:

RelationalDB→Databases→Tables-→Rows-→Columns
Elasticsearch→Indices→Types-→Documents→Fields

以上就是 Elasticsearch 里面的一些基本概念,和关系型数据库进行对比更加有助于我们理解。

准备工作

在开始本节实际操作之前,请确保已经正确安装好了 Elasticsearch,安装方式可以参考: https://setup.scrape.center/elasticsearch ,安装完成之后确保它可以在本地 9200 端口上正常运行即可。

Elasticsearch 实际上提供了一系列 Restful API 来进行存取和查询操作,我们可以使用 curl 等命令或者直接调用 API 来进行数据存储和修改操作,但总归来说不是很方便。所以这里我们直接介绍一个专门用来对接 Elasticsearch 操作的 Python 库,名称也叫作 Elasticsearch,使用 pip3 安装即可:

pip3 install elasticsearch

更详细的安装方式可以参考: https://setup.scrape.center/elasticsearch-py

安装好了之后我们就可以开始本节的学习了。

创建索引

我们先来看一下怎样创建一个索引,这里我们创建一个名为 news 的索引:

frome lasticsearch import Elasticsearch

es = Elasticsearch()
result = es.indices.create(index='news', ignore=400)
print(result)

这里我们首先创建了一个 Elasticsearch 对象,并且没有设置任何参数,默认情况下它会连接本地 9200 端口运行的 Elasticsearch 服务,我们也可以设置特定的连接信息,如:

es = Elasticsearch(
    ['https://[username:password@]hostname:port'],
    verify_certs=True, # 是否验证 SSL 证书
)

第一个参数我们可以构造特定格式的链接字符串并传入,hostnameport 即 Elasticsearch 运行的地址和端口,usernamepassword 是可选的,代表连接 Elasticsearch 需要的用户名和密码,另外而且还有其他的参数设置,比如 verify_certs 代表是否验证证书有效性。更多参数的设置可以参考 https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch

声明 Elasticsearch 对象之后,我们调用了 esindices 对象的 create 方法传入了 index 的名称,如果创建成功,会返回如下结果:

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'news'}

可以看到,返回结果是 JSON 格式,其中 acknowledged 字段表示创建操作执行成功。

但这时如果我们再把代码执行一次,则会返回如下结果:

{'error': {'root_cause': [{'type': 'resource_already_exists_exception', 'reason': 'index
[news/hHEYoyozTk_qRVV4J4a3w] already exists', 'index_uuid': 'hHEYoyozTk_qRVV4J4a3w', 'index': 'news'}],
'type': 'resource_already_exists_exception', 'reason': 'index [news/hHEYoyozTk_qRVV4J4a3w] already exists',
'index_uuid': 'hHEYoyozTk_qRVV4J4a3w', 'index': 'news'}, 'status': 400 }

它提示创建失败,其中 status 状态码是 400,表示错误原因是索引已经存在。

注意在这里的代码中,我们使用的 ignore 参数为 400,说明如果返回结果是 400 的话,就忽略这个错误,不会报错,程序不会抛出异常。

假如我们不加 ignore 这个参数:

es = Elasticsearch()
result = es.indices.create(index='news')
print(result)

再次执行就会报错了:

raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.RequestError: TransportError(400, 'resource_already_exists_exception', 'index
[news/QM6yZ2W8QE-bF1khcSoThw] already exists')

这样程序的执行会出现问题。因此,我们需要擅用 ignore 参数,把一些意外情况排除,这样才可以保证程序正常执行而不会中断。

创建完之后,我们还可以设置一下索引的字段映射定义,具体可以参考: https://elasticsearch-py.readthedocs.io/en/latest/api.html?#elasticsearch.client.IndicesClient.put_mapping

删除索引

删除索引也类似,代码如下:

from elasticsearch import Elasticsearch

es = Elasticsearch()
result = es.indices.delete(index='news', ignore=[400, 404])
print(result)

这里也使用 ignore 参数,来忽略索引不存在而删除失败,导致程序中断的问题。

如果删除成功,会输出如下结果:

{'acknowledged': True}

如果索引已经被删除,那么再执行删除,就会输出如下结果:

{'error': {'root_cause': [{'type': 'index_not_found_exception', 'reason': 'no such index [news]',
'resource.type': 'index_or_alias', 'resource.id': 'news', 'index_uuid': '_na_'}, {'type': 'index_not_found_exception', 'reason': 'no such index [news]', 'resource.type': 'index_or_alias',
'resource.id': 'news', 'index_uuid': '_na_', 'index': 'news'}], 'status': 404}

这个结果表明当前索引不存在,删除失败。返回的结果同样是 JSON 格式,状态码是 404,但是由于我们添加了 ignore 参数,忽略了 404 状态码,因此程序正常执行,输出 JSON 结果,而不是抛出异常。

插入数据

Elasticsearch 就像 MongoDB 一样,在插人数据的时候可以直接插人结构化字典数据,插人数据可以调用 create 方法。例如,这里我们插入一条新闻数据:

from elasticsearch import Elasticsearch

es = Elasticsearch()
es.indices.create(index='news', ignore=400)

data = {
    'title': '乘风破浪不负韶华 青春逐梦高考',
    'url': 'http://view.inews.qq.com/a/EDU2021041600732200'
}
result = es.create(index='news', id=1, body=data)
print(result)

这里我们首先声明了一条新闻数据,包括标题和链接,然后通过调用 create 方法插入了这条数据。在调用 create 方法时,我们传入了 4 个参数,其中 index 代表索引名称、id 是数据的唯一标识、body 则代表文档的具体内容。

运行结果如下:

{'_index': 'news', '_type': '_doc', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2,'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}

结果中的 result 字段为 created,代表数据插入成功。

另外,其实我们也可以使用 index 方法来插入数据。与 create 不同的是,create 方法需要我们指定 id 字段来唯一标识一条数据,index 方法则不需要,如果不指定 id,那么它会自动生成一个。调用 index 方法的写法如下:

es.index(index='news', body=data)

create 方法内部其实是调用了 index 方法,是对 index 方法的封装。

更新数据

更新数据也非常简单,我们同样需要指定数据的 id 和内容,调用 update 方法即可,代码如下:

from elasticsearch import Elasticsearch

es = Elasticsearch()
data = {
    'title': '乘风破浪不负韶华 青春逐梦高考',
    'url': 'http://view.inews.qq.com/a/EDU2021041600732200',
    'date': '2021-07-05'
}
result = es.update(index='news', body=data, id=1)
print(result)

这里我们为数据增加了一个日期字段,然后调用了 update 方法,结果如下:

{'_index': 'news', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}

可以看到,返回结果中的 result 字段为 updated,表示更新成功。另外,我们还注意到一个字段 _version,这代表更新后的版本号,其后数字 2 代表这是第二个版本。因为之前已经插入过一次数据,所以第一次插入的数据是版本 1,可以参见上例的运行结果,这次更新之后版本号就变成了 2,以后每更新一次,版本号都会加 1。

另外,利用 index 方法同样可以完成更新操作,其写法如下:

es.index(index='news', doc_type='politics', body=data, id=1)

可以看到,index 方法能够代替我们完成插入数据和更新数据两个操作。如果数据不存在,就执行插入操作,如果已经存在,则执行更新操作,非常方便。

删除数据

如果想删除一条数据,那么调用 delete 方法并指定需要删除的数据 id 即可。其写法如下:

from elasticsearch import Elasticsearch

es = Elasticsearch()
result = es.delete(index='news', id=1)
print(result)

运行结果如下:

{'_index': 'news', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'deleted', '_shards': {'total': 2,'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}

可以看到,运行结果中的 result 字段为 deleted,代表删除成功; _version 变成了 3,又增加了 1。

查询数据

上面的几个操作是非常简单的,普通的数据库如 MongoDB 就可以完成,看起来并没有什么了 不起。Elasticsearch 更特殊的地方在于其异常强大的检索功能。

对于中文来说,我们需要安装一个分词插件,这里使用的是 elasticsearch-analysis-ik。我们用 Elasticsearch 的另一个命令行工具 elasticsearch-plugin 来安装这个插件,这里安装的版本是 7.13.2,请确保和 Elasticsearch 的版本对应起来,命令行如下:

elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.13.2/elasticsearch-analysis-ik-7.13.2.zip

请把这里的版本号替换成你的 Elasticsearch 版本号。

安装之后,重新启动 Elasticsearch 就可以了,它会自动加载安装好的插件。

首先,我们重新新建一个索引并指定需要分词的字段,相应代码如下:

from elasticsearch import Elasticsearch

es = Elasticsearch()
mapping = {
    'properties': {
        'title': {
            'type': 'text',
            'analyzer': 'ik_max_word',
            'search_analyzer': 'ik_max_word'
        }
    }
}
es.indices.delete(index='news', ignore=[400, 404])
es.indices.create(index='news', ignore=400)
result = es.indices.put_mapping(index='news', body=mapping)
print(result)

这里我们先将之前的索引删除,然后新建了一个索引,接着更新了它的 mapping 信息。mapping 信息中指定了分词的字段,包括字段的类型 type、分词器 analyzer 和搜索分词器 search_analyzer。指定搜索分词器 search_analyzerik_max_word 表示使用我们刚才安装的中文分词插件,如果不指定,则会使用默认的英文分词器。

接下来,我们插入几条新数据:

from elasticsearch import Elasticsearch

es = Elasticsearch()
datas = [
    {
        'title': '高考结局大不同',
'url': 'https://k.sina.com.cn/article_7571064628_1c34547340010111z9.html',
 },
 {
 'title': '进入职业大洗牌时代,“吃香”职业还吃香吗?',
 'url': 'https://new.qq.com/omn/20210828/20210828A025LK00.html',
 },
 {
 'title': '乘风破浪不负韶华,奋斗青春圆梦高考',
 'url': 'http://view.inews.qq.com/a/EDU2021041600732200',
 },
 {
 'title': '他,活出了我们理想的样子',
 'url': 'https://new.qq.com/omn/20210821/20210821A020ID00.html',
 }
]

for data in datas:
 es.index(index='news', body=data)

这里我们指定了 4 条数据,它们都带有 titleurl 字段,然后通过 index 方法将它们插人 Elasticsearch 中,索引名称为 news

接下来,我们根据关键词查询一下相关内容:

result = es.search(index='news')
print(result)

运行结果如下:

{'took': 11, 'timed_out': False, '_shards': {'total': 2, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 4, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'news', '_type': '_doc', '_id' : 'jebpkHsBm-BANY-7h0Yp', '_score': 1.0, '_source': {'title': '高考结局大不同', 'url': 'https://k.sina.com.cn/article_7571064628_1c34547340010111z9.html'}}, {'_index': 'news', '_type': '_doc', '_id': 'jubpkHsBm-BANY-7h0bz', '_score': 1.0, '_source': {'title': '进入职业大洗牌时代', '吃香”职业还吃香吗?', 'url': 'https://new.qq.com/omn/20210828/20210828A025LK00.html'}}, {'_index': 'news', '_type': '_doc', '_id': 'j-bpkHsBm-BANY-7heZN', '_score': 1.0, '_source': {'title': '乘风破浪不负韶华,奋斗青春圆梦高考', 'url': 'http://view.inews.qq.com/a/EDU2021041600732200'}}, {'_index': 'news', '_type': '_doc', '_id': 'kObpkHsBm-BANY-7hean', '_score':1.0, '_source': {'title': '他,活出了我们理想的样子', 'url': 'https://new.qq.com/omn/20210821/20210821A020ID00.html'}}]}}

可以看到,这里查询出了插入的 4 条数据。它们出现在 hits 字段里面,其中 total 字段标明了查询的结果条目数, max_score 代表了最大匹配分数。

另外,我们还可以进行全文检索,这才是体现 Elasticsearch 搜索引擎特性地方:

from elasticsearch import Elasticsearch
import json

dsl = {
    'query': {
        'match': {
            'title': '高考 圆梦'
        }
    }
}

es = Elasticsearch()
result = es.search(index='news', body=dsl)
print(result)

这里我们使用 Elasticsearch 支持的 DSL 语句进行查询,使用 match 指定全文检索,检索的字段是 title,内容是 “高考圆梦”,搜索结果如下:

{'took': 6, 'timed_out': False, '_shards': {'total': 2, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits':
{'total': {'value': 1, 'relation': 'eq'}, 'max_score': 1.7796917, 'hits': [{'_index': 'news', '_type': '_doc', '_id'
: 'j-bpkHsBm-BANY-7heZN', '_score': 1.7796917, '_source': {'title': '乘风破浪不负韶华,奋斗青春圆梦高考', 'url': 'http://view.inews.qq.com/a/EDU2021041600732200'}}, {'_index': 'news', '_type': '_doc', '_id': 'jebpkHsBm-BANY-7h0Yp', '_score': 0.81085134, '_source': {'title': '高考结局大不同', 'url':
'https://k.sina.com.cn/article_7571064628_1c34547340010111z9.html'}}]}}

从结果可以看到,匹配的结果有两条,第一条的分数为 1.7796917,第二条的分数为 0.81085134,这是因为第一条匹配的数据中含有 “高考” 和 “圆梦” 两个词,第二条匹配的数据中不包含 “圆梦”,但是包含 “高考” 这个词,所以也被检索出来了,只是分数比较低。

因此可以看出,检索时会对对应的字段进行全文检索,结果还会按照检索关键词的相关性进行排序,这就是一个基本的搜索引擎雍形。

另外,Elasticsearch 还支持非常多的查询方式。这里就不再一一展开描述了,总之其功能非常强大,详情可以参考官方文档: https://www.elastic.co/guide/en/elasticsearch/reference/master/auery-dsl.html

总结

以上便是对 Elasticsearch 的基本介绍以及使用 Python 操作 Elasticsearch 的基本用法,但这些仅仅是 Elasticsearch 的基本功能,它还有更多强大的功能等待着我们去探索。