Aggregate multiples collections with Spark

Hello,

I’m trying to aggregate data from several collections into one dataframe through the union step in Spark-Scala.
I have a database named school and two collections: teacher and course, and I’d like to combine (concatenate) the data from these two collections into a single dataframe.
So, I create the pipeline, and add it in the configuration process, but it seem didn’t work.

Here is my code:

val pipeline = """[{$unionWith: {coll: "course", pipeline: [{$project: { _id: 0, courseTitle: 1, teacher: 1}}]}}]"""
    val spark = SparkSession.builder
      .appName("Data Aggregation")
      .master("local[*]")
      .config("spark.mongodb.input.uri", "mongodb://localhost:27017")
      .config("spark.mongodb.input.database", "school")
      .config("spark.mongodb.input.collection", "teacher")
      .config("partitioner","com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner")
      .config("pipeline", pipeline)
      .getOrCreate()

Data loading:

val data = MongoSpark.load(spark)

My configuration:

  • Spark: 3.1.1
  • Scala: 2.12.15
  • Mongo-connector: 3.0.2
  • MongoDB: 6.0.9

PS: I have try this aggregation with mongosh and it work.

Thanks a lot.

Can you share the error that you are getting with this?

Hi Prakul,
Thank you for your reply.

The code contains no errors.
The problem I’m raising is this: logically, with the pipeline configuration, I should get the union of my teacher and course collection in the data variable. But, it’s just the teacher data collection that’s loaded (which is defined in the spark session), which means it hasn’t taken the pipeline request into account.

Here is my two collections data:

  • student collection
[
  {
    _id: 1,
    name: 'Elena Gilbert',
    email: 'elena@gmail.com',
    teacher: 'Harry'
  },
  {
    _id: 2,
    name: 'Alaric Steven',
    email: 'alaric@gmail.com',
    teacher: 'Harry'
  }
]
  • course collection:
[
  { _id: 1, c_id: 1, courseTitle: 'Python', teacher: 'Harry' },
  { _id: 2, c_id: 2, courseTitle: 'Java', teacher: 'Harry' }
]

What I expect from the union aggregation pipeline is this:

[
  {
    _id: 1,
    name: 'Elena Gilbert',
    email: 'elena@gmail.com',
    teacher: 'Harry'
  },
  {
    _id: 2,
    name: 'Alaric Steven',
    email: 'alaric@gmail.com',
    teacher: 'Harry'
  },
  { courseTitle: 'Python', teacher: 'Harry', courseId: 1 },
  { courseTitle: 'Java', teacher: 'Harry', courseId: 2 }
]

And I am able to get this result with mongosh from the console with this command:

db.student.aggregate([ { $unionWith: { coll: "course", pipeline: [ { $project: { _id: 0, courseId: "$c_id", courseTitle: 1, teacher: 1 } }] } }] )

But not with spark data loader.