使用logstash从mysql中导出数据到Elasticsearch

我的日常工作中偶尔会有把mysql表的数据同步到es中,虽然写代码也可以搞定,但是毕竟麻烦一些,而且虽然github上有很多的elasticsearch mysql river但是毕竟
都实现有所差异,而且也不保证不出bug,加上ELK在很多公司都被采用,所以我使用的解决方案是基于logstash的jdbc插件来进行同步
本篇文章所使用的mysql表结构为:

CREATE TABLE `test` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(32) NOT NULL,
  `password` varchar(32) NOT NULL,
  `update_time` datetime DEFAULT '2017-05-16 00:00:00',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

我们使用的logstash配置test-logstash.conf为:

input {
  jdbc {
    jdbc_driver_library => "/Users/rollenholt/Downloads/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
    jdbc_user => "root"
    jdbc_password => ""
    statement => "select id, name, password, update_time from test where update_time > convert_tz(:sql_last_value, '+00:00','+08:00')"
  }
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        "hosts" => "localhost:9200"
        "index" => "myindex"
        "document_type" => "test"
        "document_id" => "%{id}"
        "user" => "elastic"
        "password" => "changeme"
    }
}

然后执行:

./logstash-5.4.0/bin/logstash -f test-logstash.conf

指定es的主键id

在把mysql表同步到es的时候,一般都指定mysql的主键为es文档的主键,要不然当再次执行同步操作的时候,会出现重复记录。
具体指定mysql表的主键为es的文档的主键,这是通过这一行来完成的:

 "document_id" => "%{id}"

上面的%{id}就是把mysql表的主键id指定为es的文档的主键id

mysql增量同步到es

在写这篇文章的时候,当前logstash的jdbc插件仅仅支持增量同步create和update,暂时不支持增量同步insert。因为这个与logstash的jdbc的同步实现有关。
因为jdbc插件增量同步的原理是基于mysql表某一个字段和sql_last_value进行比较,一般我们都基于mysql表的last_update_time,也就是最后更新时间字段来进行增量同步的。因此在新增和更新的时候,千万不要忘记去更新表的[最后更新时间]字段
每次使用logstash进行同步的时候,会在~/.logstash_jdbc_last_run文件中使用UTC时区记录下最后执行的时间:

~/workspace/es> cat ~/.logstash_jdbc_last_run
--- 2017-05-17 07:18:02.157000000 Z

因此如果你仔细看logstash的日志输出的话:

~/workspace/es> ./logstash-5.4.0/bin/logstash -f test-logstash.conf
Sending Logstash's logs to /Users/rollenholt/workspace/es/logstash-5.4.0/logs which is now configured via log4j2.properties
[2017-05-17T15:17:31,446][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@localhost:9200/]}}
[2017-05-17T15:17:31,451][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@localhost:9200/, :path=>"/"}
[2017-05-17T15:17:31,586][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x193960d2 URL:http://elastic:xxxxxx@localhost:9200/>}
[2017-05-17T15:17:31,588][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-05-17T15:17:31,632][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-05-17T15:17:31,642][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x69465baa URL://localhost:9200>]}
[2017-05-17T15:17:31,648][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
Wed May 17 15:17:32 CST 2017 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
[2017-05-17T15:17:32,444][INFO ][logstash.pipeline        ] Pipeline main started
[2017-05-17T15:17:32,464][INFO ][logstash.inputs.jdbc     ] (0.015000s) SELECT version() AS `v` LIMIT 1
[2017-05-17T15:17:32,510][INFO ][logstash.inputs.jdbc     ] (0.007000s) select id, name, password, update_time from test where update_time > convert_tz('2017-05-17 07:15:25', '+00:00','+08:00')
[2017-05-17T15:17:32,529][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{
       "password" => "password12",
    "update_time" => 2017-05-17T07:17:19.000Z,
     "@timestamp" => 2017-05-17T07:17:32.538Z,
           "name" => "name12",
       "@version" => "1",
             "id" => 12
}
[2017-05-17T15:17:35,461][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

会看到其中的一行:

select id, name, password, update_time from test where update_time > convert_tz('2017-05-17 07:15:25', '+00:00','+08:00')

删除操作的同步

从上一小节我们可以看出,如果我们在上次执行以后,删除了表中的一行记录,这个时候我们再次执行logstash同步,会发现并不能把我们的删除操作同步过来,原理很简单,因为是基于最后更新时间比较的,删除操作仅仅删除了这一行记录,所以会导致不能增量同步删除操作;看日志:
先删除记录

mysql> delete from test where id = 12;
Query OK, 1 row affected (0.00 sec)

然后执行logstash同步:

~/workspace/es> ./logstash-5.4.0/bin/logstash -f test-logstash.conf
Sending Logstash's logs to /Users/rollenholt/workspace/es/logstash-5.4.0/logs which is now configured via log4j2.properties
[2017-05-17T15:18:01,171][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@localhost:9200/]}}
[2017-05-17T15:18:01,176][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@localhost:9200/, :path=>"/"}
[2017-05-17T15:18:01,344][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x4b6db194 URL:http://elastic:xxxxxx@localhost:9200/>}
[2017-05-17T15:18:01,345][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-05-17T15:18:01,391][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-05-17T15:18:01,403][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x3dcb341d URL://localhost:9200>]}
[2017-05-17T15:18:01,408][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
Wed May 17 15:18:01 CST 2017 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
[2017-05-17T15:18:02,155][INFO ][logstash.pipeline        ] Pipeline main started
[2017-05-17T15:18:02,173][INFO ][logstash.inputs.jdbc     ] (0.015000s) SELECT version() AS `v` LIMIT 1
[2017-05-17T15:18:02,188][INFO ][logstash.inputs.jdbc     ] (0.003000s) select id, name, password, update_time from test where update_time > convert_tz('2017-05-17 07:17:32', '+00:00','+08:00')
[2017-05-17T15:18:02,206][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-05-17T15:18:05,173][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

会发现没有匹配到记录
解决办法其实也很简单,我们不使用物理删除,而是使用逻辑删除,比如在mysql表中增加一个字段is_deleted来标记是否数据已经逻辑删除,但是在修改这个字段的时候别忘记同时设置表的[最后更新时间]字段

时区问题(8小时)

因为logstash默认使用的是UTC时区,会比北京时间早8个小时,而在kibana中展示的时候,默认时区又取的是浏览器的时区,所以导致在kibana中看的时候没问题,而在
查询mysql表的时候会有一些问题,因为mysql表的时区和logstash记录的logstash_jdbc_last_run时间时区不一样,而官方不建议修改logstash的代码去修改时区。
因此我们在上面的sql中使用的是:

select id, name, password, update_time from test where update_time > convert_tz(:sql_last_value, '+00:00','+08:00')

sql_last_value增加8个小时:

~/workspace/es> cat ~/.logstash_jdbc_last_run
--- 2017-05-17 07:18:02.157000000 Z

比如对于上面的UTC时间执行convert_tz函数:

mysql> select convert_tz('2017-05-17 07:18:02.157000000', '+00:00','+08:00');
+----------------------------------------------------------------+
| convert_tz('2017-05-17 07:18:02.157000000', '+00:00','+08:00') |
+----------------------------------------------------------------+
| 2017-05-17 15:18:02.157000                                     |
+----------------------------------------------------------------+
1 row in set (0.00 sec)

将增量同步修改为全量同步

如果因为一些原因你想进行一次全量同步的话,直接删除~/.logstash_jdbc_last_run文件,在执行./logstash-5.4.0/bin/logstash -f test-logstash.conf
这样sql_last_value的值会取1970-01-01 00:00:00

参考资料

本文版权归作者所有,禁止一切形式的转载,复制等操作
赞赏

微信赞赏支付宝赞赏

One reply to 使用logstash从mysql中导出数据到Elasticsearch

  1. 测试markdown评论

    {"a":"b"}
    

发表评论

电子邮件地址不会被公开。 必填项已用*标注