Last active
March 24, 2018 16:08
-
-
Save VedAustin/5a93453793a10650943f83fad48b78ce to your computer and use it in GitHub Desktop.
How to join disparate data sources and map the customer journey through various touch points
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import functions as F | |
from pyspark.sql import Window | |
# Read data | |
user_guid_email = spark.read.json("/mnt/public-blobs/attribution-modelling/data2/id-maps/id-map-email.json") | |
user_guid_paid_search = spark.read.json("/mnt/public-blobs/attribution-modelling/data2/id-maps/id-map-paid-search.json") | |
user_guid_social = spark.read.json("/mnt/public-blobs/attribution-modelling/data2/id-maps/id-map-social.json") | |
guid_event_email = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/events-email") | |
guid_event_paid_search = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/events-paid-search") | |
guid_event_social = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/events-social") | |
purchases_data = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/purchases") | |
# Create a new column `touchpoint` (for concatination and identification purposes) | |
purchases_data = purchases_data.withColumn("touchpoint",F.lit("purchase")) | |
email_event = user_guid_email.join(guid_event_email,["uuid"],"right")\ | |
.withColumn("touchpoint",F.lit("marketing_email")).drop("uuid") | |
paid_search_event = user_guid_paid_search.join(guid_event_paid_search,["uuid"],"right")\ | |
.withColumn("touchpoint",F.lit("paid_search")).drop("uuid") | |
social_event = user_guid_social.join(guid_event_social,["uuid"],"right")\ | |
.withColumn("touchpoint",F.lit("social_media")).drop("uuid") | |
temp_event = email_event.union(paid_search_event).union(social_event) | |
temp_event = temp_event.withColumn("purchaseAmount",F.lit(0)) | |
# for concatination | |
temp_purchases = purchases_data.withColumnRenamed("purchaseDatetime","eventDateTime") | |
temp_purchases = temp_purchases.select("user","eventDateTime","touchpoint","purchaseAmount") | |
# Now concatenate them | |
event_data = temp_purchases.union(temp_event) | |
# Process to identify journey between each purchase for every user | |
event_data = event_data.withColumn("purchased",F.when(F.col("touchpoint")=='purchase',1).otherwise(0)) | |
win = Window.partitionBy("user").orderBy("eventDatetime").rangeBetween(Window.unboundedPreceding,0) | |
event_data_journey_ = event_data\ | |
.withColumn("cumsum",F.sum(F.col("purchased")).over(win))\ | |
.withColumn("cumsum",F.when(F.col("purchased") == 1, F.col("cumsum") - 1).otherwise(F.col("cumsum"))) | |
@F.udf(ArrayType(StringType())) | |
def remove_purchases(arr): | |
return [elem for elem in arr if elem != 'purchase'] | |
event_data_journey = event_data_journey_\ | |
.groupBy(["user","cumsum"])\ | |
.agg(F.collect_list(F.col("touchpoint")).alias("journey"), | |
F.max(F.col("purchaseAmount")).alias("purchaseAmount_"))\ | |
.withColumn("journey_",F.when(F.array_contains(F.col("journey"),"purchase"), | |
remove_purchases(F.col("journey"))).otherwise(F.col("journey")))\ | |
.drop("cumsum","journey") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment