Skip to content

Instantly share code, notes, and snippets.

@up1
Last active September 8, 2020 17:36
Show Gist options
  • Save up1/c00f4059c58f4f3e67eb5c39c11d1aa2 to your computer and use it in GitHub Desktop.
Save up1/c00f4059c58f4f3e67eb5c39c11d1aa2 to your computer and use it in GitHub Desktop.
Elasticsearch :: sync data with RDBMS
CREATE DATABASE sample_db;
USE sample_db;
DROP TABLE IF EXISTS product;
CREATE TABLE product (
id BIGINT(20) UNSIGNED NOT NULL,PRIMARY KEY (id),UNIQUE KEY unique_id (id),
name VARCHAR(32) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
input {
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/sample_db"
jdbc_user => <your username>
jdbc_password => <your password>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM product WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
stdout { codec => "rubydebug"}
elasticsearch {
index => "products_sync"
document_id => "%{[@metadata][_id]}"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment