Skip to content

Instantly share code, notes, and snippets.

@Nempickaxe
Created September 9, 2021 07:57
Show Gist options
  • Save Nempickaxe/92663615e66fa2c419d0f2ce70fca784 to your computer and use it in GitHub Desktop.
Save Nempickaxe/92663615e66fa2c419d0f2ce70fca784 to your computer and use it in GitHub Desktop.
Read from Mongo
def read_mongo_collection(uri, pipeline=None, given_schema=None, spark=None):
"""
:param uri: uri for mongo connection
:param pipeline: pipeline option for pushing queries to mongo
:param given_schema: schema option, will read in mentioned schema
:return: dataframe after reading from mongo
"""
if pipeline:
if not given_schema:
return spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).option(
"partitioner", "MongoSplitVectorPartitioner").option("uri", uri).load()
else:
return spark.read.schema(given_schema).format("com.mongodb.spark.sql.DefaultSource").option("pipeline",
pipeline).option(
"uri", uri).load()
if given_schema:
return spark.read.schema(given_schema).format("com.mongodb.spark.sql.DefaultSource").option("uri", uri).load()
return spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", uri).load()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment