Hi Everyone
Am trying to read the MongoDB collection of having around 90M documents to a Spark dataframe in Databricks. When read the collection am getting this below error. Can someone help me here to resolve this error or give me some idea to resolve it.
Code Snippet:
spark.conf.set("com.mongodb.spark.sql.connector.read.partitioner.Partitioner", "com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner")
#Have used all the given partitioner from the official docs [link](https://www.mongodb.com/docs/spark-connector/master/configuration/read/#std-label-conf-paginateintopartitionspartitioner)
spark.conf.set("spark.mongodb.read.partitioner.options.partition.field", "t_uid")
spark.conf.set("spark.mongodb.read.partitioner.options.partition.size", "128")
spark.conf.set("spark.mongodb.read.partitioner.options.samples.per.partition", "1")
df = spark.read.format("mongodb")\
.option("spark.mongodb.read.database", "db_name")\
.option("spark.mongodb.read.collection", "collection_name")\
.option("sql.inferSchema.mapTypes.enabled", "true")\
.option("sql.inferSchema.mapTypes.minimum.key.size", "512")\
.option("outputExtendedJson", "true")\
.option("spark.mongodb.read.connection.uri", "mongodb+srv://***:***@***.mongodb.net/").load()
display(df)
Configuration Details:
-
Mongo DB Spark Connector - org.mongodb.spark:mongo-spark-connector_2.12:10.1.1
-
Bson - org.mongodb:bson:4.10.0
-
Databricks Runtime Verison - 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)
-
Collection Size - 340 GB
-
Avg Document Size - 10KB
-
Total No. of Docs - 90 Million
Py4JJavaError: An error occurred while calling o610.load.
: scala.MatchError: com.mongodb.spark.sql.connector.schema.InferSchema$1@74b6c909 (of class com.mongodb.spark.sql.connector.schema.InferSchema$1)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:80)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:116)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:129)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:126)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:129)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:126)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:129)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:126)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:75)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:63)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:67)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1113)
at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1120)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1120)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:146)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:333)
at scala.Option.flatMap(Option.scala:271)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:331)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)