使用Docker和Elasticsearch搭建全文本搜索引擎应用(中)

喜欢记得转发关注

3. 接入Elasticsearch

第一步是要接入本地Elasticsearch实例。

3.0 加入ES链接模块

在server/connection.js中加入如下初始化代码:

const elasticsearch = require('elasticsearch')// Core ES variables for this projectconst index = 'library'const type = 'novel'const port = 9200const host = process.env.ES_HOST || 'localhost'const client = new elasticsearch.Client({ host: { host, port } })/** Check the ES connection status */async function checkConnection () {let isConnected = falsewhile (!isConnected) {console.log('Connecting to ES')try { const health = await client.cluster.health({}) console.log(health) isConnected = true} catch (err) { console.log('Connection Failed, Retrying...', err)}}}checkConnection()

下面用docker-compose来重建更改过的应用。之后运行docker-compose up -d重新启动后台进程。

应用启动后,命令行运行docker exec gs-api "node" "server/connection.js",在容器中运行脚本,应该可以看到如下输出:

{ cluster_name: 'docker-cluster',status: 'yellow',timed_out: false,number_of_nodes: 1,number_of_data_nodes: 1,active_primary_shards: 1,active_shards: 1,relocating_shards: 0,initializing_shards: 0,unassigned_shards: 1,delayed_unassigned_shards: 0,number_of_pending_tasks: 0,number_of_in_flight_fetch: 0,task_max_waiting_in_queue_millis: 0,active_shards_percent_as_number: 50 }

如果一切顺利,就可以把最后一行的checkConnection()调用删掉,因为最终应用会从connection模块之外调用它。

3.1 给Reset Index添加Helper功能

在server/connection.js文件checkConnection之下添加如下内容, 以便更加方便重置索引。

/** Clear the index, recreate it, and add mappings */async function resetIndex () {if (await client.indices.exists({ index })) {await client.indices.delete({ index })}await client.indices.create({ index })await putBookMapping()}

3.2 添加Book Schema

紧接resetIndex之后,添加如下功能:

/** Add book section schema mapping to ES */async function putBookMapping () {const schema = {title: { type: 'keyword' },author: { type: 'keyword' },location: { type: 'integer' },text: { type: 'text' }}return client.indices.putMapping({ index, type, body: { properties: schema } })}

此处为书目索引定义了mapping(映射)。Elasticsearch索引类似于SQL的表或者MongoDB的connection。通过mapping我们可以定义文档每个域和数据类型。Elasticsearch是schema-less,因此技术上说不需要添加mapping,但是通过mapping可以更好控制数据处理方式。

例如,有两个关键词域,分别是“titile”和“author”,文本定为“text”域。这样定义搜索引擎会有完全不同的动作:搜索中,引擎会在text域中查找可能匹配项,而在关键词域则是精确匹配。看起来差别不大,但却对搜索行为和搜索速度有很大影响。

在文件最后输出功能和属性,可以被其它模块访问。

module.exports = {client, index, type, checkConnection, resetIndex}

4. 加载源数据

本文使用从Gutenberg项目(一个在线提供免费电子书的应用)提供的数据。包括100本经典书目,例如《80天环绕地球》、《罗密欧与朱丽叶》以及《奥德赛》等。

4.1 下载书籍数据

本文的数据可以从以下网站下载:

https://cdn.patricktriest.com/data/books.zip,之后解压到项目根目录下的books/ 子目录下。

也可以用命令行实现以上操作:

wget https://cdn.patricktriest.com/data/books.zipunar books.zip

4.2 预览书籍

打开一本书,例如219-0.txt。书籍以公开访问license开始,跟着是书名、作者、发行日期、语言以及字符编码。

Title: Heart of DarknessAuthor: Joseph ConradRelease Date: February 1995 [EBook #219]Last Updated: September 7, 2016Language: EnglishCharacter set encoding: UTF-8

随后是声明信息:* * * START OF THIS PROJECT GUTENBERG EBOOK HEART OF DARKNESS * * *,紧接着就是书的实际内容。

书的最后会发现书籍结束声明: * * * END OF THIS PROJECT GUTENBERG EBOOK HEART OF DARKNESS * * *,紧跟着是更加详细的书籍license。

下一步将用编程方法从书中提取元数据,并且从* * *之间将书籍内容抽取出来。

4.3 读取数据目录

本节写一段脚本读取书籍内容添加到Elasticsearch中,脚本存放在server/load_data.js 中。

首先,获得books目录下所有文件列表。

const fs = require('fs')const path = require('path')const esConnection = require('./connection')/** Clear ES index, parse and index all files from the books directory */async function readAndInsertBooks () {try {// Clear previous ES indexawait esConnection.resetIndex()// Read books directorylet files = fs.readdirSync('./books').filter(file => file.slice(-4) === '.txt')console.log(`Found ${files.length} Files`)// Read each book file, and index each paragraph in elasticsearchfor (let file of files) { console.log(`Reading File - ${file}`) const filePath = path.join('./books', file) const { title, author, paragraphs } = parseBookFile(filePath) await insertBookData(title, author, paragraphs)}} catch (err) {console.error(err)}}readAndInsertBooks()

运行docker-compose -d --build重建镜像更新应用。

运行docker exec gs-api "node" "server/load_data.js"调用包含load_data脚本应用,应该看到Elasticsearch输出如下。随后,脚本会因为错误退出,原因是调用了一本目前还不存在的helper函数(parseBookFile)。

4.4 读取数据文件

创建server/load_data.js文件,读取每本书元数据和内容:

/** Read an individual book text file, and extract the title, author, and paragraphs */function parseBookFile (filePath) {// Read text fileconst book = fs.readFileSync(filePath, 'utf8')// Find book title and authorconst title = book.match(/^Title:\s(.+)$/m)[1]const authorMatch = book.match(/^Author:\s(.+)$/m)const author = (!authorMatch || authorMatch[1].trim() === '') ? 'Unknown Author' : authorMatch[1]console.log(`Reading Book - ${title} By ${author}`)// Find Guttenberg metadata header and footerconst startOfBookMatch = book.match(/^\*{3}\s*START OF (THIS|THE) PROJECT GUTENBERG EBOOK.+\*{3}$/m)const startOfBookIndex = startOfBookMatch.index + startOfBookMatch[0].lengthconst endOfBookIndex = book.match(/^\*{3}\s*END OF (THIS|THE) PROJECT GUTENBERG EBOOK.+\*{3}$/m).index// Clean book text and split into array of paragraphsconst paragraphs = book.slice(startOfBookIndex, endOfBookIndex) // Remove Guttenberg header and footer.split(/\n\s+\n/g) // Split each paragraph into it's own array entry.map(line => line.replace(/\r\n/g, ' ').trim()) // Remove paragraph line breaks and whitespace.map(line => line.replace(/_/g, '')) // Guttenberg uses "_" to signify italics. We'll remove it, since it makes the raw text look messy..filter((line) => (line && line.length !== '')) // Remove empty linesconsole.log(`Parsed ${paragraphs.length} Paragraphs\n`)return { title, author, paragraphs }}

此函数执行以下功能:

  1. 从文件系统中读入文件

  2. 使用正则表达式抽取书名和作者

  3. 通过定位* * *,来抽取书中内容

  4. 解析出段落

  5. 清洗数据,移除空行

最后返回一个包含书名、作者和段落列表的对象。

运行docker-compose up -d --build和docker exec gs-api "node" "server/load_data.js" ,输出如下:

到这步,脚本顺利分理出书名和作者,脚本还会因为同样问题出错(调用还未定义的函数)。

4.5 在ES中索引数据文件

最后一步在load_data.js中添加insertBookData函数,将上一节中提取数据插入Elasticsearch索引中。

/** Bulk index the book data in Elasticsearch */async function insertBookData (title, author, paragraphs) {let bulkOps = [] // Array to store bulk operations// Add an index operation for each section in the bookfor (let i = 0; i < paragraphs.length; i++) {// Describe actionbulkOps.push({ index: { _index: esConnection.index, _type: esConnection.type } })// Add documentbulkOps.push({ author, title, location: i, text: paragraphs[i]})if (i > 0 && i % 500 === 0) { // Do bulk insert in 500 paragraph batches await esConnection.client.bulk({ body: bulkOps }) bulkOps = [] console.log(`Indexed Paragraphs ${i - 499} - ${i}`)}}// Insert remainder of bulk ops arrayawait esConnection.client.bulk({ body: bulkOps })console.log(`Indexed Paragraphs ${paragraphs.length - (bulkOps.length / 2)} - ${paragraphs.length}\n\n\n`)}

此函数索引书籍段落,包括作者、书名和段落元数据信息。使用bulk操作插入段落,比分别索引段落效率高很多。

批量bulk索引这些段落可以使本应用运行在低配电脑上(我只有1.7G内存),如果你有高配电脑(大于4G内容),也许不用考虑批量bulk操作。

运行docker-compose up -d --build 和 docker exec gs-api "node" "server/load_data.js" 输出如下:

相关资讯: