下载logstash包 解压
bin/logstash-plugin install logstash-input-jdbc
cd bin
mkdir mysql(新建目录)
把mysql-connector-java-5.1.34.jar拷贝到mysql目录下
新建两个文件 jdbc.conf jdbc.sql
input {
jdbc {
# mysql 数据库链接,dianpingdb为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianpingdb"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
# 驱动
jdbc_driver_library => "/opt/logstash-7.3.0/bin/mysql/mysql-connector-java-5.1.34.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "/opt/logstash-7.3.0/bin/mysql/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
select a.id,a.name,a.tags,
concat(a.latitude,',',a.longitude) as location,
a.remark_score,a.price_per_man,a.category_id,
b.name as category_name,
a.seller_id,
c.remark_score as seller_remark_score,
c.disabled_flag as seller_disabled_flag
from shop a
inner join category b
on a.category_id = b.id
inner join seller c on c.id = a.seller_id
门店索引构建
PUT /shop
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id":{"type":"integer"},
"name":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},
"tags":{"type":"text","analyzer": "whitespace","fielddata":"true"},
"location":{"type":"geo_point"},
"remark_score":{"type":"double"},
"price_per_man":{"type":"integer"},
"category_id":{"type":"integer"},
"category_name":{"type":"keyword"},
"seller_id":{"type":"integer"},
"seller_remark_score":{"type":"double"},
"seller_disabled_flsg":{"type":"integer"}
}
}
}
cd bin
./logstash -f mysql/jdbc.conf
|