Getting scala.MatchError: com.mongodb.spark.sql.connector.schema.InferSchema when read a large collection of 90M documents

Hi Everyone

Am trying to read the MongoDB collection of having around 90M documents to a Spark dataframe in Databricks. When read the collection am getting this below error. Can someone help me here to resolve this error or give me some idea to resolve it.

Code Snippet:

spark.conf.set("com.mongodb.spark.sql.connector.read.partitioner.Partitioner", "com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner")
#Have used all the given partitioner from the official docs [link](https://www.mongodb.com/docs/spark-connector/master/configuration/read/#std-label-conf-paginateintopartitionspartitioner)
spark.conf.set("spark.mongodb.read.partitioner.options.partition.field", "t_uid")
spark.conf.set("spark.mongodb.read.partitioner.options.partition.size", "128")
spark.conf.set("spark.mongodb.read.partitioner.options.samples.per.partition", "1")
df = spark.read.format("mongodb")\
    .option("spark.mongodb.read.database", "db_name")\
    .option("spark.mongodb.read.collection", "collection_name")\
    .option("sql.inferSchema.mapTypes.enabled", "true")\
    .option("sql.inferSchema.mapTypes.minimum.key.size", "512")\
    .option("outputExtendedJson", "true")\
    .option("spark.mongodb.read.connection.uri", "mongodb+srv://***:***@***.mongodb.net/").load()
display(df)

Configuration Details:

  • Mongo DB Spark Connector - org.mongodb.spark:mongo-spark-connector_2.12:10.1.1

  • Bson - org.mongodb:bson:4.10.0

  • Databricks Runtime Verison - 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)

  • Collection Size - 340 GB

  • Avg Document Size - 10KB

  • Total No. of Docs - 90 Million

Py4JJavaError: An error occurred while calling o610.load.
: scala.MatchError: com.mongodb.spark.sql.connector.schema.InferSchema$1@74b6c909 (of class com.mongodb.spark.sql.connector.schema.InferSchema$1)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:80)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:116)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:129)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:126)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:129)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:126)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:129)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:126)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:75)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:63)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:67)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1113)
	at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1120)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1120)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:146)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:333)
	at scala.Option.flatMap(Option.scala:271)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:331)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)
1 Like

Hi @Mani_Perumal ,

The error seems to be related to the schema inference process. Before fixing that can you try using with Apache spark version 3.2.4? (I see you are using version Spark 3.4.1)

If the problem still persists , you can try setting explicit schema instead of relying on automatic schema inference. If you’re not sure about the schema, you can inspect a few documents from your MongoDB collection to determine the schema.

Hi @Prakul_Agarwal

Have tried with the Spark version 3.2.1, but the issue still persist. Agreed, as you said can give the schema manually and read the documents, but in my case have hundreds of collections with the same volume and each documents in the collection contains 250+ fields. It’s very hard to manually feed the schema for all pipeline and it’s just one DB and have many other in my plate to do it.

Can you please suggest some other workaround to sort this issue.

Hi @Prakul_Agarwal

Have given schema explicitly by reading some documents and still am getting the same schema inference error. Don’t know how to proceed further. Can you give me some light on this, how to resolve it.

Possibly due to empty arrays that are nested in your collection structure somewhere with 10.1+ connector. (MongoDB would need to confirm).

https://jira.mongodb.org/browse/SPARK-412

Thanks @mikev for reporting this as a bug with more detail in the Mongo Jira board.
https://jira.mongodb.org/browse/SPARK-412

@Prakul_Agarwal Can you please check this one.

@Mani_Perumal ,
im also facing same issue , did you find any approach to read data from MongoDB without feeding schema manually.