Hello,
I’m trying to aggregate data from several collections into one dataframe through the union step in Spark-Scala.
I have a database named school and two collections: teacher and course, and I’d like to combine (concatenate) the data from these two collections into a single dataframe.
So, I create the pipeline, and add it in the configuration process, but it seem didn’t work.
Here is my code:
val pipeline = """[{$unionWith: {coll: "course", pipeline: [{$project: { _id: 0, courseTitle: 1, teacher: 1}}]}}]"""
val spark = SparkSession.builder
.appName("Data Aggregation")
.master("local[*]")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017")
.config("spark.mongodb.input.database", "school")
.config("spark.mongodb.input.collection", "teacher")
.config("partitioner","com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner")
.config("pipeline", pipeline)
.getOrCreate()
Data loading:
val data = MongoSpark.load(spark)
My configuration:
- Spark: 3.1.1
- Scala: 2.12.15
- Mongo-connector: 3.0.2
- MongoDB: 6.0.9
PS: I have try this aggregation with mongosh and it work.
Thanks a lot.