logstash 与ElasticSearch:从CSV文件

logstash 与ElasticSearch:从CSV文件到搜索宝库的导入指南

使用 logstash 导入数据到 ES 时,由三个步骤组成:input、filter、output。整个导入过程可视为:unix 管道操作,而管道中的每一步操作都是由 "插件" 实现的。使用 ./bin/logstash-plugin list 查看 logstash 已安装的插件。

每个插件的选项都可以在官网查询,先明确是哪一步操作,然后去官方文档看是否有相应的插件是否支持这种操作。比如 output 配置选项:plugins-outputs-elasticsearch-options),其中的 doc_id 选项就支持 指定 docid 写入 ES。在这里,简要说明一些常用的插件,要想了解它们实现的功能可参考官方文档。

  1. mutate 插件 用于字段文本内容处理,比如 字符替换
  2. csv 插件 用于 csv 格式文件导入 ES
  3. convert 插件 用于字段类型转换
  4. date 插件 用于日期类型的字段处理

使用 logstash 导入时,默认以 "message" 标识 每一行数据,并且会生成一些额外的字段,比如 @version、host、@timestamp,如果用不着,这些字段可以去除掉 ,此外,要注意 ES 中的索引的格式 (Mapping 结构),最好是指定自定义的索引模板,保证索引最 "精简"。

另外这里记录一些常用的参数及其作用,更具体的解释可查看官方文档。

  1. sincedb_path 告诉 logstash 记录文件已经处理到哪一行了,从而当 logstash 发生故障重启时,可从故障点处开始导入,避免从头重新导入。
  2. remove_field 删除某些字段

配置文件完成后,执行以下命令./bin/logstash -f csvfile_logstash.conf 即可启动 logstash 执行导入操作。

以下是各种错误解决:
错误一:

ConfigurationError”, :message=>”Expected one of #, input, filter, output at line 1, column 1

如果 配置文件内容是正确的,用 Notepad++ 检查一下文件的编码,确保是:UTF-8 无 BOM 格式编码

解决 SOH 分隔符问题

由于 csv 插件的 separator 选项不支持转义字符,因此无法用\u0001来代表 SOH。如果 csv 文件以 SOH 分隔符 (\u0001) 分割,一种方案是使用 mutate 插件替换,将\u0001替换成逗号。如下所示:

    mutate{
		# 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号
		gsub => [ "message","\u0001","," ]
		# @timestamp 字段是默认生成的, 名称修改成 created
		rename => ["@timestamp", "created"]
    }


但是实际上 logstash6.8.3 是支持按 SOH 分割的。在 Linux shell 下,先按 ctrl+v,再按 ctrl+a,输入的就是 SOH。那么在 vim 中打开配置文件,在 vim 的 insert 模式下,先按 ctrl+v,再按 ctrl+a,将 SOH 作为 csv 插件的 separator 分割符。

    csv {
			# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
            separator => ""
            columns => ["topsid", "title"]
			# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
			remove_field => ["host", "@timestamp", "@version", "message","path"]
        }	


一个将 csv 文件内容导入 ES 的示例配置模板如下:(csv 文件中的每一行以 SOH 作为分割符)

  • logstash input 插件支持多种数据来源,比如 kafka、beats、http、file 等。在这里我们的数据来源是文件,因此采用了 logstash input file 插件。
  • 把数据从文件中读到 logstash 后,可能需要对文件内容 / 格式 进行处理,比如分割、类型转换、日期处理等,这由 logstash filter 插件实现。在这里我们进行了文件的切割和类型转换,因此使用的是 logstash filter csv 插件和 mutate 插件。
  • 处理成我们想要的字段后,接下来就是导入到 ES,那么就需要配置 ES 的地址、索引名称、Mapping 结构信息 (使用指定模板写入),这由 logstash output 插件实现,在这里我们把处理后的数据导入 ES,因此使用的是 logstash output elasticsearch 插件。
input {
  file {
      path => "/data/psj/test/*.csv"
      start_position => "beginning"
	  sincedb_path => "/dev/null"
    }
}

filter {
    csv {
			# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
            separator => ""
            columns => ["topsid", "title"]
			# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
			remove_field => ["host", "@timestamp", "@version", "message","path"]

        }			
	mutate {
    convert => {
		# 类型转换
		"topsid" => "integer"
		"title" => "string"
    }
  }
}

output {
   elasticsearch {
        hosts => "http://http://127.0.0.1:9200"
        index => "chantitletest"
    	# 指定 文档的 类型为 "_doc"
		document_type => "_doc"
		# 指定doc id 为topsid字段的值
		document_id => "%{topsid}"
		manage_template => true
		# 使用自定义的模板写入,否则将会以logstash默认模板写入
		template => "/data/services/logstash-6.8.3/config/chantitletpe.json"
		template_overwrite => true
		template_name => "chantitletpe"
       }
    stdout{
		codec => json_lines
	}
}



(也可以采用 logstash filter 插件的 mutate 选项 将 SOH 转换成逗号):

filter {
    mutate{
		# 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号
		gsub => [ "message","\u0001","," ]
		# @timestamp 字段是默认生成的, 名称修改成 created
		rename => ["@timestamp", "created"]
    }
    csv {
	    # 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
            separator => ","
            columns => ["topsid", "title"]
			# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
			remove_field => ["host", "@timestamp", "@version", "message","path"]
        }			
	mutate {
    convert => {
		# 类型转换
		"topsid" => "integer"
		"title" => "string"
    }
  }
}


使用的自定义模板如下:

{
  "index_patterns": [
    "chantitle_v1",
    "chantitletest"
  ],
  "settings": {
    "number_of_shards": 3,
    "analysis": {
      "analyzer": {
        "my_hanlp_analyzer": {
          "tokenizer": "my_hanlp"
        },
        "pinyin_analyzer": {
          "tokenizer": "my_pinyin"
        }
      },
      "tokenizer": {
        "my_hanlp": {
          "enable_normalization": "true",
          "type": "hanlp_standard"
        },
        "my_pinyin": {
          "keep_joined_full_pinyin": "true",
          "lowercase": "true",
          "keep_original": "true",
          "remove_duplicated_term": "true",
          "keep_first_letter": "false",
          "keep_separate_first_letter": "false",
          "type": "pinyin",
          "limit_first_letter_length": "16",
          "keep_full_pinyin": "true"
        }
      }
    }
  },
  "mappings": {
    "_doc": {
      "properties": {
        "created": {
          "type": "date",
          "doc_values": false,
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "title": {
          "type": "text",
          "fields": {
            "pinyin": {
              "type": "text",
              "boost": 10,
              "analyzer": "pinyin_analyzer"
            },
            "raw": {
              "type": "keyword",
              "doc_values": false
            }
          },
          "analyzer": "my_hanlp_analyzer"
        },
        "topsid": {
          "type": "long",
          "doc_values": false
        }
      }
    }
  }
}


上面给了一个 csv 文件导入 ES,这里再给个 txt 文件导入 ES 吧。txt 以逗号分割,每列的内容都在冒号里面,只需要前 4 列内容,一行示例数据如下:

"12345","12345","研讨区","12345","500","xxxx","2008-08-04 22:20:24","0","300","0","5","0","","0","0","","","0","0"

这里采用的是 logstash filter 的 dissect 插件。相比于 grok 插件,它的优点不是采用正规匹配的方式解析数据,速度较快,但不能解析复杂数据。只能够对较为规律的数据进行导入。logstash 配置文件如下:

input {
  file {
      path => "/data/psj/test/*.txt"
      start_position => "beginning"
      # sincedb_path => "/dev/null"
    }
}

filter {
  dissect {
      mapping => {
		# 插件输入的每一行数据默认名称是message,由于每列数据在双引号里面,因此解析前4列数据的写法如下:
        "message" => '"%{topsid}","%{subsid}","%{subtitle}","%{pid}"'
      }
	  # 删除自动生成的、用不着的一些字段
	  remove_field => ["host", "@timestamp", "@version", "message","path"]
	  convert_datatype => {
		# 类型转换
		"topsid" => "int"
		"subsid" => "int"
		"pid" => "int"
    }
    }
}

output {
   elasticsearch {
        hosts => "http://127.0.0.1:9200"
        index => "chansubtitletest"
		document_type => "_doc"
		# 指定doc id 为topsid字段的值
		document_id => "%{subsid}"
		manage_template => true
		# 使用自定义的模板写入,否则将会以logstash默认模板写入
		template => "/data/services/logstash-6.8.3/config/chansubtitle.json"
		template_overwrite => true
		template_name => "chansubtitle"
       }
    stdout{
		codec => json_lines
	}
}


更多优质内容请关注:汀丶人工智能;会提供一些相关的资源和优质文章,免费获取阅读。

#elasticsearch#
搜索推荐系统 文章被收录于专栏

搜索推荐系统

全部评论

相关推荐

04-26 14:36
已编辑
郑州信息科技职业学院 Java
由于高考成绩不是很理想,听取了张雪峰老师的建议,优先选了专业并且当时的想法就是选一个能赚钱的专业,于是最终选择了报了一个能收留我的有计算机专业的学校。当时听张雪峰老师说河南的学习氛围很好,所以就想去体验一下,事实雀食如张雪峰老师所说,大家都一股脑的铺在学习这条路上。可能是因为那边氛围导致的吧,我一开始想的也是卷学习卷绩点,所以大一的时候就一直在学习硬试教育的一些东西,学期结束了,排名出来的时候中上水平吧,据我了解保研的只有前5名可能会有机会,当时的心里就想着,我这成绩再卷也卷不到哪去了,并且保研也无望了,总结的说,一些事情只有真正做了才知道是不是自己所追求的。说了很多废话吧,剩下的关于学校的就长话短说了吧。大二很多专业课基本上要从早八上到晚上,但基本上我都是不去,不如自学现在新媒体技术这么发达,并且还可以学一下自己需要的技术栈,由于学校的课程原因对其他的技术栈不是很了解,所以,一心就投入在Java这个方向了,但是,Python也会学一下,这是因为加入实验室,实验室老师是做人工智能方向的缘故。现在回想,我大二当时还是学的太慢了,还有就是信息差太大了,出来工作之后才发现有些佬们已经大二就出来实习,并且八股就背的滚瓜烂熟了。只能说这里的学习氛围很好吧,走廊里都是背书刷题的声音,跟身边的同学和实验室的同学谈是否直接就业的事,他们要么都是说考研,要么对直接就业很含糊,可能是因为觉得自己学的还不够吧,我想说,学的不够就干中学呗,反正,我先迈出去这步再说。到了大三上还是没有找工作的打算,因为身边的人也都还没有这个意识吧,现在跟了身边的同事聊天才知道,我的信息差太大了。到了大三下刚开始,我才开始正式的踏上求职路,当时的信息差还是很大的,根本就不敢碰瓷大厂,想着有一个公司能要再说吧,并且地域也限制的很死,只想着在本地找一下,因为怕学校找事(我想这是学校一贯操作了),在本地吧,他们大多数都是接受的线下面,一开始面了一个,可能自己比较摆也很悲观,就显得我很差吧,hr面完就没后续了,最终终于有一个面,并且也展示出自己的自信和对专业的理解了,最后,我也没想着这么多背调公司呀,当个备选什么的就直接去了。也算是我的第一家正式的公司吧(之前都是线上的码农兼职),干多了就发现,这个公司压根学不到东西,并且薪资低的,因为我是第一个进来的计算机实习生,有一个同事干了两三年的吧,带着我做的时候是真能学到东西,但是,最后那个同事离职了,我就只能和学艺术的老板直接汇报项目进度,一个学艺术的来指导我这个科班出身的就很离谱的好吧。最后,我也离职了,也跟前同事聊了很久,她说我是她见过大三就能学到这程度,已经超过很多人了,并且她当时在的时候还说我是内定能转正的。并且还说我真的可以去考研。我也仔细思考了一下,我决定让自己沉淀一下再出发吧,先备考了软件设计师,然后期末考,大三暑期的时候就充实自己的简历,并且也认识了一个某东的老哥,也用了内推码,教我了怎么写好简历量化成果之类的,总之,很感谢一路走来帮助我的人吧,并且我在边充实自己的同时也在边投递简历,但当时卡的也很死,要选base地在河南附近的,不像现在全国可飞。面了很多base地在学校附近的,然后,还有一个北京的py和杭州的java,最终就这两个地方给了offer,但是都是实习转正的,不是秋招offer,因为觉得Java的太卷了,然后,面试的时候也会感觉压力很大,所以就把杭州的那个拒了,去了北京的,北京是免费住的房子(三个月这是伏笔),当时觉得环境很好,但是合租室友的作息跟自己的作息不一样就很不习惯,于是,我就想着要是三个月后我一定要找一个单间的哪怕破一点。北京这个公司吧就很像国企的感觉,早九晚五,当月发当月工资,并且干的活接触的数据量都不是很大,就是干了很多杂活,并且mentor和部门的领导都不是技术出身,所以,我能学到的东西少之又少,但是吧,学习是自己的事,而且这部门不是很忙对于实习生来说,我完全可以学自己的东西(前提是不被发现)。到最后这个部门的氛围就很微妙,我遇到不会的问他们我应该怎么做的时候,他们说让我自己想,我当时就想说,神人一个,啥都不说让我自己干,干出来又不满意,你说你让我干py的东西你不会我就不说啥了,让我干无关代码的东西,让我调研项目应该做些什么内容,现在回想都是泪呀,我就这样被欺压的过完了三个月,最后免费住的地方也到期了,伏笔来了,最后,找我谈话说你技术可以了能看出来,因为你也自己独立完成了消息通知那一块内容嘛,但是,由于我们部门干的活比较杂并且我也缺少一些电力相关的一些知识,所以,觉得不合适。(OS:其实我对每一份工作都是真心换真心的,并且这些电力知识我也知道我有一点欠缺所以我也有自己再学习,你们啥也不教我,最后把屎盆子把我头上扣)最后,回到了学校,心态也发生了变化,想着做啥都不如找一个稳定的工作重要,想着回家沉淀吧,少年终有出头日。但是,计划赶不上变化,之前那个同事,内推了我去她现在的公司,并且是做AI应用的也是我想接触的,并且还是与我上家的业务场景类似的,真的感谢那个同事,俗话说:千里马常有而伯乐不常有。并且那里的部门领导也很好,并且说我虽然不是电力相关出身的,但是能做的这样已经很不错了,所以DDDD,由于各种不可抗力因素吧,还是想找一个离家近,然后不是很像小作坊的感觉(这个公司虽然比较小,但是比之前那个大的公司的氛围和待遇一点都不差的好吧甚至更好)。最终,在学校也呆了一个月吧,也陆陆续续面了一个月有一个C厂的面答的都挺好直接就谈薪了,但是风评不好还是保命要紧,还有各种的中小厂面吧,但感觉都不是自己想要的,只是想刷刷面试经验吧(这是某东哥告诉我的,与其一直改简历不如去多面)。最后,在校期间面了一个比较合适的某鸦智能,一直推进到了HR面,但是最后被横向了,开始复盘,被横向了属实是没招了,经历了这么多大风大浪什么场面没见过。过年期间,求职路线关闭,把自己缺少的技术栈和简历中的项目业务理清楚说明白。年过完就要开始加入找工作大军中了,把节前没面完的先面了,节后一开始就是某鸟的HRG面,聊的就很憋屈的感觉,问我技术方面的,说我说的很像AI的(我心想跟你说具体的细节你又说我不想听技术的,说的比较宽泛浅显说我AI)。最后,反正体验感不是很好的结束了吧。说一个星期等通知,等了两个星期才说是通过的(我认为是排名靠前的那些人没去,顺位到我了)。那你既然这样说了,那我就接受吧。还没入职就问我要身份证信息要这要那的,最后都给过去了,说HC调整,要重新review,又又又一次被恶心到了。后面就是陆续的沉淀面试等,我当时的重心已经完全的想着私企没人要,就去试试考公和考央国企了,毕竟我的履历不看学历的话放到电网当中还是可以的。私企的话有一个外企洋里洋气的说话,问我怎么口语这么好?我说这叫智取,宝贝。虽然这个tek外企过了,但是还有一个openday要去线下,来回的衣食住行不是很方便也不是很想去所以就拒绝了没去。后来就收到了,国网网申通过的通知,说实话,我之前问了很多我们学校历年有没有考央国企之类的案例,很显然都不知道,也可以说少之又少吧,于是我就奔赴京城进京赶考,唉,时间不太合适就想着算了吧,再等等,好事多磨,宁缺毋滥吧。金三银四终于等来了面试的机会,这个岗位我只能说我不是很熟悉,但是语言这东西吧都是相通的,重要的是我要把其中的内核搞懂,梳理清楚业务逻辑。最终,来到了这家公司,目前来说是我遇到过最好的了,能有hc且不是要通过实习评估的那种,并且合同期限是三年的,并且是12%的公积金。我认为这就是我所遇到的最好的了。希望能真心换真心吧,不再把我当创口贴/路边一条了,并且也遇到了很多优秀的同事。总的来说,就是要是能重来我要选李白。我肯定会打破这些信息差,后悔知道的太晚,并且跟优秀的人聊天说话真的可以学到很多东西,之前上文提到的贵人就不说了,说说最近的,他是跟我一届,学校后缀甚至不如我的后缀,但是真正了解的才会知道真是佬👍,他跟我找工作的时间线差不多,但是他在中大厂甚至大厂都呆过,因为跟他聊了才知道我当时的信息差有多大,并且毅力也是我甚至…都没有的。并且也听说了他们学校找工作的氛围很好,不像我阿巴阿巴阿巴,只有考研等相关的一些。并且说的一些观点都是很认同的。总之,希望能在这好好的吧,我真的不想经历大起大落了。经历了,打招呼挂,简历挂,一面挂,HR面挂,offer挂的,现在的心态已经放宽了很多了,但是难过还是有的,希望这家公司诚不欺我吧。也祝大家遇到自己的梦中情厂
选择和努力,哪个更重要?
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务