Last active
April 12, 2019 07:08
-
-
Save hihellobolke/dd2dc0fcebba485975d1 to your computer and use it in GitHub Desktop.
Spark GraphX Pattern Matching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.graphx._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.graphx.lib._ | |
/* | |
* Alice, Dave and Bob are share traders, | |
* Each maintains atleast 1 trading account in their banks | |
* Trading accounts have shares, which are bought/sold in a transaction.. | |
* | |
* E.g. | |
* (Alice) --[has_account]--> (HSBC) --[has_shares]--> (APPL) --[sell]--> (Transaction) --[buy]--> (APPL) <--[has_shares]-- (GS) <--[has_account]-- (Dave) | |
* | |
* | |
*/ | |
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array( | |
//Alice her bank is HSBC, she has APPL shares | |
(1000L, ("human", "Alice")), | |
(1001L, ("bank", "HSBC")), | |
(1002L, ("shares", "APPL")), | |
//Transaction vertices | |
(9001L, ("transaction", "t1")), | |
(9002L, ("transaction", "t2")), | |
//Dave and his Goldman Sachs & shares | |
(2000L, ("human", "Dave")), | |
(2001L, ("account", "GS")), | |
(2002L, ("shares", "MSFT")), | |
(2003L, ("shares", "APPL")), | |
(2004L, ("shares", "FB")), | |
//Transaction vertices | |
(9002L, ("transaction", "t2")), | |
(9003L, ("transaction", "t3")), | |
(9004L, ("transaction", "t4")), | |
//Bob's got two trading accounts, | |
// and APPL & CSCO shares | |
(3000L, ("human", "Bob")), | |
(3001L, ("account", "BOA")), | |
(3002L, ("account", "GS")), | |
(3003L, ("shares", "APPL")), | |
(3004L, ("shares", "APPL")), | |
(3005L, ("shares", "CSCO")) | |
)) | |
val relationships: RDD[Edge[String]] = sc.parallelize(Array( | |
//Alice Account & shares in that | |
Edge(1000L, 1001L, "has_account"), Edge(1001L, 1002L, "has_shares"), | |
//Dave has many diff shares.. | |
Edge(2000L, 2001L, "has_account"), Edge(2001L, 2002L, "has_shares"), Edge(2001L, 2003L, "has_shares"), Edge(2001L, 2004L, "has_shares"), | |
//Bob got shares in multiple acct | |
Edge(3000L, 3001L, "has_account"), Edge(3001L, 3003L, "has_shares"), Edge(3001L, 3004L, "has_shares"), | |
Edge(3000L, 3002L, "has_account"), Edge(3002L, 3005L, "has_shares"), | |
//Transactions...... | |
//Alice sells APPL shares to Dave - transact t1 | |
Edge(1002L, 9001L, "sell"), Edge(9001L, 2003L, "buy"), | |
//Alice sells APPL shares to Bob - transact t2 | |
Edge(1002L, 9002L, "sell"), Edge(9002L, 3004L, "buy"), | |
//Bob sells APPL shres to Dave - t3 | |
Edge(3003L, 9003L, "sell"), Edge(9003L, 2003L, "buy"), | |
//Dave sells APPL shares to Alice - t4 | |
Edge(2003L, 9004L, "sell"), Edge(9004L, 1002L, "buy"))) | |
val graph = Graph(users, relationships) | |
val cs = PatternMatching.run(graph, List( | |
EdgePattern("has_account", false), | |
EdgePattern("has_shares", false), | |
EdgePattern("sell", false)) | |
).collect.toList | |
println("List cs:\n" + cs) | |
println("Count cs:\n" + cs.length) | |
//This match runs continously.... never returns | |
val cs = PatternMatching.run(graph, List( | |
EdgePattern("has_account", false), | |
EdgePattern("has_shares", false), | |
EdgePattern("sell", false), | |
EdgePattern("buy",false)) | |
).collect.toList | |
println("List cs:\n" + cs) | |
println("Count cs:\n" + cs.length) | |
/* wish EdgePattern could take a func, | |
* once could match paths, then | |
* with conditions like where say transaction amount is > 1000$ | |
* or src vertex attr satisifies some condition... | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Where is PatternMatching.run() located?