Skip to content

Instantly share code, notes, and snippets.

@d0choa
Last active March 16, 2022 16:01
Show Gist options
  • Save d0choa/b86fcf639b2fc469138a063a9667aa69 to your computer and use it in GitHub Desktop.
Save d0choa/b86fcf639b2fc469138a063a9667aa69 to your computer and use it in GitHub Desktop.
List of potential new variants in variant index (derived from other datasets)
from os import sep
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql import SparkSession
sparkConf = SparkConf()
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.mode', 'AUTO')
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.project.id',
'open-targets-eu-dev')
# establish spark connection
spark = (
SparkSession.builder
.config(conf=sparkConf)
.master('local[*]')
.getOrCreate()
)
toplociPath = "gs://genetics-portal-dev-staging/v2d/220210/toploci_betas_fixed.parquet"
ldPath = "gs://genetics-portal-dev-staging/v2d/220210/ld.parquet"
fmPath = "gs://genetics-portal-dev-staging/v2d/220210/finemapping.parquet"
crediblePath = "gs://genetics-portal-dev-staging/finemapping/220113_merged/credset"
gwasSumstatsPath = "gs://genetics-portal-dev-sumstats/filtered/pvalue_0.005/gwas/220208"
mtraitsSumstatsPath = "gs://genetics-portal-dev-sumstats/filtered/pvalue_0.005/molecular_trait/220105"
variantPath = "gs://genetics-portal-dev-data/22.02.4/outputs/lut/variant-index"
variant = spark.read.json(variantPath)
v2gPath = "gs://genetics-portal-dev-data/22.02.4/outputs/v2g/"
v2g = spark.read.json(v2gPath)
toploci = spark.read.parquet(toplociPath)
fmLoci = spark.read.parquet(fmPath)
ldLoci = spark.read.parquet(ldPath)
credibleLoci = spark.read.json(crediblePath)
gwasSumstats = spark.read.parquet(gwasSumstatsPath)
mtraitsSumstats = spark.read.parquet(mtraitsSumstatsPath)
out = (
toploci
.select("chrom", "pos", "ref", "alt")
.union(
ldLoci
.select(
F.col("lead_chrom").alias("chrom"),
F.col("lead_pos").alias("pos"),
F.col("lead_ref").alias("ref"),
F.col("lead_alt").alias("alt")
)
)
.union(
ldLoci
.select(
F.col("tag_chrom").alias("chrom"),
F.col("tag_pos").alias("pos"),
F.col("tag_ref").alias("ref"),
F.col("tag_alt").alias("alt")
)
)
.union(
fmLoci
.select(
F.col("lead_chrom").alias("chrom"),
F.col("lead_pos").alias("pos"),
F.col("lead_ref").alias("ref"),
F.col("lead_alt").alias("alt")
)
)
.union(
fmLoci
.select(
F.col("tag_chrom").alias("chrom"),
F.col("tag_pos").alias("pos"),
F.col("tag_ref").alias("ref"),
F.col("tag_alt").alias("alt")
)
)
.union(
credibleLoci
.select(
F.col("lead_chrom").alias("chrom"),
F.col("lead_pos").alias("pos"),
F.col("lead_ref").alias("ref"),
F.col("lead_alt").alias("alt")
)
)
.union(
credibleLoci
.select(
F.col("tag_chrom").alias("chrom"),
F.col("tag_pos").alias("pos"),
F.col("tag_ref").alias("ref"),
F.col("tag_alt").alias("alt")
)
)
.union(
gwasSumstats
.filter(F.col("pval") < 1e-5)
.select(
F.col("chrom"),
F.col("pos"),
F.col("ref"),
F.col("alt")
)
)
# .union(
# mtraitsSumstats
# .select(
# F.col("chrom"),
# F.col("pos"),
# F.col("ref"),
# F.col("alt")
# )
# )
.distinct()
).persist()
visits = (
spark.read
.options(header = True, sep = "\t", inferSchema = True)
.csv("gs://ot-team/dochoa/analytics_genetics_allvariants_sinceJan2019.tsv")
.withColumnRenamed("landing", "variant")
.filter(~F.col("variant").contains("locus"))
.withColumn("variant", F.regexp_replace("variant", "\?.+", ""))
.withColumn("variant", F.regexp_replace("variant", "\"", ""))
.groupBy("variant", "source")
.agg(F.sum("sessions").alias("sessions"))
.sort(F.col("sessions").desc())
.groupBy("variant")
.agg(
F.concat_ws(";", F.collect_list("source")).alias("sources"),
F.sum("sessions").alias("sessions")
)
.sort(F.col("sessions").desc())
)
outWithId = (
out
.withColumn("variant", F.concat_ws("_", "chrom", "pos", "ref", "alt"))
.select("variant")
)
variantSubset = (
variant
.withColumn("grch38Variant", F.concat_ws("_", "chr_id", "position", "ref_allele", "alt_allele"))
.withColumn("variant", F.concat_ws("_", "chr_id_b37", "position_b37", "ref_allele", "alt_allele"))
.select("variant", "grch38Variant")
)
outWithIdInOld = (
outWithId
.withColumnRenamed("variant", "grch38Variant")
.join(
variantSubset
.withColumn("mistake", F.lit("wrong_build?")),
on = "grch38Variant",
how = "inner"
)
)
unknown = (
visits
.join(outWithId, how = "left_anti", on = "variant")
.join(outWithIdInOld, how = "left", on = "variant")
.join(gwasSumstats
.withColumn("variant", F.concat_ws("_", "chrom", "pos", "ref", "alt"))
.withColumn("isInPheWAS", F.lit(True))
.select("variant", "isInPheWAS")
.distinct(),
how = "left", on = "variant")
.join(v2g
.withColumn("variant", F.concat_ws("_", "chr_id", "position", "ref_allele", "alt_allele"))
.withColumn("isInV2G", F.lit(True))
.select("variant", "isInV2G")
.distinct(),
how = "left", on = "variant")
.fillna(False, ["isInPheWAS", "isInV2G"])
.sort(F.col("sessions").desc())
.persist()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment