Created
May 23, 2020 07:50
-
-
Save devacto/2c2b4268bd0d1052a11f0e2e7af9cc6e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Bank Mandiri KSQL POT | |
### Download the MySQL Driver | |
`curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.17.tar.gz" | tar -xzf - -C /Users/victor/desktop/mandiri-ksql-pot/mysql-driver --strip-components=1 mysql-connector-java-8.0.17/mysql-connector-java-8.0.17-bin.jar` | |
### Deploy Confluent Platform using Docker | |
`docker-compose up --build` | |
### Deploy Customer Source Kafka Connector | |
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @source-connector-configs/mysql_customer_source_connector_config.json` | |
### Deploy Transaction Source Kafka Connector | |
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @source-connector-configs/mysql_transaction_source_connector_config.json` | |
### Deploy Response Source Kafka Connector | |
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @source-connector-configs/mysql_response_source_connector_config.json` | |
### Deploy Customer Sink Kafka Connector | |
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @sink-connector-configs/mysql_customer_sink_connector.json` | |
### KSQL: Register the Customer table | |
`CREATE TABLE customer WITH (KAFKA_TOPIC='mysql_customer', VALUE_FORMAT ='AVRO', KEY='account_number');` | |
CREATE TABLE customer_table_rekey WITH (KAFKA_TOPIC='mysql_customer', VALUE_FORMAT ='AVRO', KEY='phone_number'); | |
`SELECT ROWKEY, account_number FROM customer;` | |
### KSQL: Create Response Stream | |
`CREATE STREAM response WITH (KAFKA_TOPIC='mysql_response', VALUE_FORMAT ='AVRO');` | |
### KSQL: Increase Credit Limit Stream | |
`SELECT c.account_number, c.name, c.attended_last_trade_show, c.phone_number, c.credit_card_limit, c.updated_at, r.text FROM response_rekey r LEFT JOIN customer_table_rekey c ON r.phone_number = c.phone_number WHERE r.text = 'INCREASEMYLIMIT';` | |
### KSQL: Create Transaction Stream | |
`CREATE STREAM trans WITH (KAFKA_TOPIC='mysql_transaction',VALUE_FORMAT='AVRO');` | |
### KSQL: Create Enriched Transaction Stream | |
`CREATE STREAM trans_enriched WITH (PARTITIONS=1) AS SELECT t.id, t.account_number, t.merchant, t.amount, c.name, c.credit_card_limit FROM trans t LEFT JOIN customer c ON t.ACCOUNT_NUMBER = c.ACCOUNT_NUMBER;` | |
### KSQL: Create Table for Total Trans in 30 Days for 1 User | |
`CREATE TABLE month_trans_by_account AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount FROM trans t WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-08-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-08-31','yyyy-MM-dd') GROUP BY t.account_number;` | |
### KSQL: Hypermart Transactions in 12 months | |
`CREATE TABLE spbu_trans_by_account AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount FROM trans t WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-01-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-12-31','yyyy-MM-dd') and t.merchant = 'SPBU' GROUP BY t.account_number;` | |
`CREATE TABLE hypermart_trans_by_account AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount FROM trans t WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-06-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-08-31','yyyy-MM-dd') and t.merchant = 'Hypermart' and sum_amount > 2000 GROUP BY t.account_number;` | |
create table voucher_customers as select | |
create table hypermart_eligible_customers as select * from spbu_trans_by_account s left join large_limit_customer l on s.account_number = l.account_number; | |
create table large_credit_usage_keyed as select *, cast(account_number as key) from large_credit_usage partition by key; | |
create table hypermart_promo_customers as select * from hypermart_eligible_customers c left join LARGE_CREDIT_USAGE_BY_ACCOUNT_NUMBER h on c.rowkey = h.rowkey; | |
CREATE STREAM LARGE_CREDIT_USAGE_STREAM WITH (KAFKA_TOPIC='LARGE_CREDIT_USAGE', VALUE_FORMAT='AVRO'); | |
CREATE STREAM LARGE_CREDIT_USAGE_STREAM_BY_ACCOUNT_NUMBER AS SELECT * FROM LARGE_CREDIT_USAGE_STREAM PARTITION BY ACCOUNT_NUMBER; | |
CREATE TABLE LARGE_CREDIT_USAGE_BY_ACCOUNT_NUMBER WITH (KAFKA_TOPIC='LARGE_CREDIT_USAGE_STREAM_BY_ACCOUNT_NUMBER', VALUE_FORMAT='AVRO'); | |
CREATE TABLE HYPERMART_VOUCHER_MESSAGE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'HYPERMART_VOUCHER_MESSAGE') AS SELECT ('Anda mendapatkan voucher hypermart sebesar 1 juta.')"MESSAGE", hypermart_promo_customers.PHONE_NUMBER "PHONE_NUMBER" FROM hypermart_promo_customers hypermart_promo_customers; | |
CREATE TABLE HYPERMART_VOUCHER_MESSAGE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'HYPERMART_VOUCHER_MESSAGE') AS SELECT ('Anda mendapatkan voucher hypermart sebesar 1 juta.')"MESSAGE", hypermart_promo_customers.H_PHONE_NUMBER "PHONE_NUMBER" FROM hypermart_promo_customers hypermart_promo_customers; | |
### KSQL: Create Stream for Sum Credit Usage in 30 Days for one user | |
`CREATE TABLE credit_usage AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount, c.credit_card_limit AS cc_limit, c.phone_number, (SUM(CAST(t.amount as double)) / (CAST(c.credit_card_limit as double)) * 100.00) AS credit_usage FROM trans t LEFT JOIN customer c ON t.account_number = c.account_number WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-08-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-08-31','yyyy-MM-dd') GROUP BY t.account_number, c.credit_card_limit, c.phone_number;` | |
### KSQL: Create Stream for 80% Credit Line Usage | |
`CREATE TABLE large_credit_usage as SELECT * from credit_usage where credit_usage > 80;` | |
### KSQL: Send Message for Large Credit Usage | |
`create table increase_credit_message as select 'Pemakaian kartu kredit untuk account ' + cast(T_ACCOUNT_NUMBER as string) + ' telah mencapai ' + cast(credit_usage as string) + ' persen. Untuk menaikkan limit kartu kredit anda ke ' + cast(cc_limit + 100000 as string) + ', reply dengan INCREASEMYLIMIT.' as message, phone_number from large_credit_usage;` | |
### To use KSQL CLI from current Docker setup | |
`docker exec -it ksql-cli /bin/bash` | |
`ksql http://ksql-server:8088` | |
To detach from container: Ctrl-P, Ctrl-Q | |
### KSQL: To process data from beginning of each Kafka topic | |
`SET 'auto.offset.reset'='earliest';` | |
### Prune Docker resources after they are being used | |
`docker container prune && docker network prune && docker volume prune` | |
### Kafka Delete Topic | |
`/Users/victor/Downloads/from_desktop/drc-demo/confluent-5.3.0/bin/kafka-topics --delete --zookeeper localhost:2181 --topic mysql_transaction` | |
`/Users/victor/Downloads/from_desktop/drc-demo/confluent-5.3.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql_customer --from-beginning` |
Author
devacto
commented
May 23, 2020
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment