Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Connectors
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Connectorschevron-right

Using AWS IAM Authentication with the MongoDB Connector for Apache Kafka

Robert Walters, Jagadish Nallapaneni4 min read • Published Jul 01, 2024 • Updated Jul 01, 2024
KafkaJavaConnectors
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
AWS Identity and Access Management (IAM) is a widely used service that enables developers to control secure access to AWS services and resources. A key benefit of AWS IAM is its ability to make secure authenticated connections without the client having to pass sensitive passwords.
As of version 1.13, the MongoDB Connector for Apache Kafka (Kafka Connector) now supports authentication via AWS IAM. AWS IAM support has been one of the most commonly requested features for the Kafka Connector, and in this tutorial, we will walk through how to use it.
To achieve support for AWS IAM, we added support for overriding the MongoClient interface allowing a developer to provide a CustomCredentialProvider. While this provider can be used for any authentication method, this tutorial will focus on building an AWS IAM credential provider.
To illustrate this, we will walk through a simple configuration of an AWS EC2 instance installed with Apache Kafka. This instance will use the MongoDB Connector for Apache Kafka to sink data from a Kafka topic to an Atlas cluster, and authenticate using AWS IAM.

Prerequisites

If you wish to follow along in this article, make sure you have the following:
  • An AWS IAM role, “KafkaBlogAtlasRole”. This role will later be mapped as an Atlas database user.
  • MongoDB Atlas cluster. If you do not have an Atlas cluster, learn how to get started with Atlas..
  • AWS EC2 instance
    • Install Apache Kafka.
    • Configure the EC2 instance with an AWS IAM role, “KafkaConnectBlogRole.”
    • Add a policy to KafkaConnectBlogRole that allows “AssumeRole” on the KafkaBlogAtlasRole.
Example policy:
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "KafkaAtlasBlogPolicy",
6 "Effect": "Allow",
7 "Action": "sts:AssumeRole",
8 "Resource": "arn:aws:iam::979559056307:role/KafkaBlogAtlasRole"
9 }
10 ]
11}
  • Java Development Kit version 8 or later
  • A database user in the desired MongoDB Atlas cluster
    • Configure the database user with AWS IAM and use the AWS Role ARN for KafkaBlogAtlasRole. The Atlas database user should have access to read/write to the desired collection.
MongoDB Atlas edit user dialog showing AWS IAM authentication method
At this point, we are ready to start building the custom authentication provider.

Getting started

Starting with version 1.13 of the MongoDB Connector for Apache Kafka, we now have the ability to plug in a custom authentication provider. In this article, we will create a custom authentication provider in Java, compile it, and add it to the lib directory of your Kafka deployment. The provider we will create will support AWS IAM authentication.

Building the custom authentication component

To start, in your favorite Java editor, create a project and build the following JAR that contains this source:
SampleAssumeRoleCredential.java
1package com.mongodb;
2
3import java.util.Map;
4import java.util.function.Supplier;
5
6import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
7
8import com.amazonaws.auth.AWSCredentialsProvider;
9import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
10import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
11import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
12import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
13import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
14import com.amazonaws.services.securitytoken.model.Credentials;
15import com.amazonaws.util.StringUtils;
16
17public class SampleAssumeRoleCredential implements CustomCredentialProvider {
18
19 public SampleAssumeRoleCredential() {}
20 @Override
21 public MongoCredential getCustomCredential(Map<?, ?> map) {
22 AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();
23 Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {
24 AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
25 .withCredentials(provider)
26 .withRegion("us-east-1")
27 .build();
28 AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
29 .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))
30 .withRoleSessionName("Test_Session");
31 AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
32 Credentials creds = assumeRoleResult.getCredentials();
33 // Add your code to fetch new credentials
34 return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());
35 };
36 return MongoCredential.createAwsCredential(null, null)
37 .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
38 }
39
40 @Override
41 public void validate(Map<?, ?> map) {
42 String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");
43 if (StringUtils.isNullOrEmpty(roleArn)) {
44 throw new RuntimeException("Invalid value set for customProperty");
45 }
46 }
47
48 @Override
49 public void init(Map<?, ?> map) {
50
51 }
52}
Example pom.xml file
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6 <groupId>com.mongodb</groupId>
7 <artifactId>SampleAssumeRoleCredential</artifactId>
8 <version>1.0-SNAPSHOT</version>
9 <build>
10 <plugins>
11 <plugin>
12 <groupId>org.apache.maven.plugins</groupId>
13 <artifactId>maven-shade-plugin</artifactId>
14 <version>3.5.3</version>
15 <configuration>
16 <!-- put your configurations here -->
17 </configuration>
18 <executions>
19 <execution>
20 <phase>package</phase>
21 <goals>
22 <goal>shade</goal>
23 </goals>
24 </execution>
25 </executions>
26 </plugin>
27 </plugins>
28 </build>
29
30 <dependencies>
31 <!-- Java MongoDB Driver dependency -->
32 <!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
33 <dependency>
34 <groupId>org.mongodb</groupId>
35 <artifactId>mongodb-driver-sync</artifactId>
36 <version>5.1.0</version>
37 </dependency>
38 <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
39 <dependency>
40 <groupId>com.amazonaws</groupId>
41 <artifactId>aws-java-sdk</artifactId>
42 <version>1.12.723</version>
43 </dependency>
44
45 <!-- slf4j logging dependency, required for logging output from the MongoDB Java Driver -->
46 <dependency>
47 <groupId>org.slf4j</groupId>
48 <artifactId>slf4j-jdk14</artifactId>
49 <version>1.7.28</version>
50 </dependency>
51
52 <dependency>
53 <groupId>kafka-connect</groupId>
54 <artifactId>kafka-connect</artifactId>
55 <scope>system</scope>
56 <version>1.13.0</version>
57 </dependency>
58 </dependencies>
59
60 <properties>
61 <maven.compiler.source>17</maven.compiler.source>
62 <maven.compiler.target>17</maven.compiler.target>
63 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
64 </properties>
65
66</project>
Note: We need to reference MongoDB Connector for Apache Kafka version 1.13 or later. In this example, the connector was downloaded from the Kafka Connector GitHub repository and stored in the local Maven repository.
1mvn install:install-file -Dfile=/Users/robert.walters/Downloads/mongodb-kafka-connect-mongodb-1.13.0/lib/mongo-kafka-connect-1.13.0-confluent.jar -DgroupId=kafka-connect -DartifactId=kafka-connector -Dversion=1.13.0 -Dpackaging=jar\n
Compile and place this SampleAssumeRole.jar file in the Kafka libs directory in your Kafka Connect deployment. You are now ready to configure your MongoDB Connector for Apache Kafka to use this custom authentication provider.

Configure the MongoDB Connector for Apache Kafka

To support the custom provider in version 1.13, the following configuration options were added to the connector:
Configuration ParameterDescription
mongo.custom.auth.mechanism.enableDetermines if the connector should use the provider class; default is false
mongo.custom.auth.mechanism.providerClassJava class entry point for the custom provider
mongodbaws.auth.mechanism.roleArnRole that has IAM access to the database
In this example, the custom authentication component will use AWS IAM AssumeRole to authenticate with MongoDB Atlas. Here is an example of this configuration connecting to a MongoDB Atlas cluster.
1{
2 "name": "mongo-tutorial-sink",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
5 "topics": "Stocks2",
6 "connection.uri": "<MongoDB Connection String>",
7 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
8 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
9 "value.converter.schemas.enable": false,
10 "database": "Stocks",
11 "collection": "StockData",
12 "mongo.custom.auth.mechanism.enable":"true",
13 "mongo.custom.auth.mechanism.providerClass":"com.mongodb.SampleAssumeRoleCredential",
14 "mongodbaws.auth.mechanism.roleArn":"<AWS IAM ARN>"
15 }
16}
Notice that the roleArn is the ARN (Amazon Resource Name) of the AWS IAM role that is used for Atlas users, KafkaBlogAtlasRole. At runtime, since Kafka Connect is running under the AWS IAM role of the EC2 instance, KafkaConnectBlogRole, and this role has a policy that allows AssumeRole on the KafkaBlogAtlasRole, the service can connect to Atlas without storing any credentials!
You are now ready to send data to Kafka and watch it arrive in a MongoDB collection using the MongoDB Connector for Apache Kafka configured with AWS IAM authentication to MongoDB Atlas.

Summary

In this article, we discussed the requirements needed to support a custom authentication provider such as AWS IAM authentication. Version 1.13 of the Kafka Connector introduced new parameters that define which class to use within the custom provider JAR file. This JAR file includes the logic for the custom authentication.
Note that while adding AWS IAM authentication capability natively to the Kafka Connector is possible (it is just software, after all), doing so would not only add a version dependency on the AWS IAM SDK, but it would also significantly increase the size of the connector, making it impractical for all other users who do not wish to use this authentication method. For these reasons, writing the custom credential provider is the solution for integrating the Kafka Connector with authentication methods such as AWS IAM.
Read the MongoDB Connector for Apache Kafka documentation to learn more. Questions? Join us in the MongoDB Developer Community.
Top Comments in Forums
There are no comments on this article yet.
Start the Conversation

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Spark Up Your MongoDB and BigQuery Using BigQuery Spark Stored Procedures


Aug 12, 2024 | 5 min read
Tutorial

Go to MongoDB Using Kafka Connectors - Ultimate Agent Guide


Sep 17, 2024 | 7 min read
Tutorial

Deploying the MongoDB Enterprise Kubernetes Operator on Google Cloud


Jan 13, 2023 | 6 min read
Article

Learn How to Leverage MongoDB Data Within Kafka with New Tutorials!


Sep 17, 2024 | 1 min read
Table of Contents
  • Prerequisites