A Voyage AI se une ao MongoDB para impulsionar aplicativos de AI mais precisos e confiáveis no Atlas.

Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Usando a autenticação AWS IAM com o conector MongoDB para Apache Kafka

Robert Walters, Jagadish Nallapaneni4 min read • Published Jul 01, 2024 • Updated Jul 01, 2024
Facebook Icontwitter iconlinkedin icon
Avaliar este tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
AWS Identity and Access Management (IAM) is a widely used service that enables developers to control secure access to AWS services and resources. A key benefit of AWS IAM is its ability to make secure authenticated connections without the client having to pass sensitive passwords.
As of version 1.13, the MongoDB Connector for Apache Kafka (Kafka Connector) now supports authentication via AWS IAM. AWS IAM support has been one of the most commonly requested features for the Kafka Connector, and in this tutorial, we will walk through how to use it.
To achieve support for AWS IAM, we added support for overriding the MongoClient interface allowing a developer to provide a CustomCredentialProvider. While this provider can be used for any authentication method, this tutorial will focus on building an AWS IAM credential provider.
To illustrate this, we will walk through a simple configuration of an AWS EC2 instance installed with Apache Kafka. This instance will use the MongoDB Connector for Apache Kafka to sink data from a Kafka topic to an Atlas cluster, and authenticate using AWS IAM.

Pré-requisitos

If you wish to follow along in this article, make sure you have the following:
  • An AWS IAM role, “KafkaBlogAtlasRole”. This role will later be mapped as an Atlas database user.
  • MongoDB Atlas cluster. If you do not have an Atlas cluster, learn how to get started with Atlas..
  • AWS EC2 instance
    • Install Apache Kafka.
    • Configure the EC2 instance with an AWS IAM role, “KafkaConnectBlogRole.”
    • Add a policy to KafkaConnectBlogRole that allows “AssumeRole” on the KafkaBlogAtlasRole.
Example policy:
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "KafkaAtlasBlogPolicy",
6 "Effect": "Allow",
7 "Action": "sts:AssumeRole",
8 "Resource": "arn:aws:iam::979559056307:role/KafkaBlogAtlasRole"
9 }
10 ]
11}
  • Java Development Kit version 8 or later
  • A database user in the desired MongoDB Atlas cluster
    • Configure the database user with AWS IAM and use the AWS Role ARN for KafkaBlogAtlasRole. The Atlas database user should have access to read/write to the desired collection.
MongoDB Atlas edit user dialog showing AWS IAM authentication method
At this point, we are ready to start building the custom authentication provider.

Começar

Starting with version 1.13 of the MongoDB Connector for Apache Kafka, we now have the ability to plug in a custom authentication provider. In this article, we will create a custom authentication provider in Java, compile it, and add it to the lib directory of your Kafka deployment. The provider we will create will support AWS IAM authentication.

Building the custom authentication component

To start, in your favorite Java editor, create a project and build the following JAR that contains this source:
SampleAssumeRoleCredential.java
1package com.mongodb;
2
3import java.util.Map;
4import java.util.function.Supplier;
5
6import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
7
8import com.amazonaws.auth.AWSCredentialsProvider;
9import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
10import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
11import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
12import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
13import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
14import com.amazonaws.services.securitytoken.model.Credentials;
15import com.amazonaws.util.StringUtils;
16
17public class SampleAssumeRoleCredential implements CustomCredentialProvider {
18
19 public SampleAssumeRoleCredential() {}
20 @Override
21 public MongoCredential getCustomCredential(Map<?, ?> map) {
22 AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();
23 Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {
24 AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
25 .withCredentials(provider)
26 .withRegion("us-east-1")
27 .build();
28 AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
29 .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))
30 .withRoleSessionName("Test_Session");
31 AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
32 Credentials creds = assumeRoleResult.getCredentials();
33 // Add your code to fetch new credentials
34 return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());
35 };
36 return MongoCredential.createAwsCredential(null, null)
37 .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
38 }
39
40 @Override
41 public void validate(Map<?, ?> map) {
42 String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");
43 if (StringUtils.isNullOrEmpty(roleArn)) {
44 throw new RuntimeException("Invalid value set for customProperty");
45 }
46 }
47
48 @Override
49 public void init(Map<?, ?> map) {
50
51 }
52}
Example pom.xml file
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6 <groupId>com.mongodb</groupId>
7 <artifactId>SampleAssumeRoleCredential</artifactId>
8 <version>1.0-SNAPSHOT</version>
9 <build>
10 <plugins>
11 <plugin>
12 <groupId>org.apache.maven.plugins</groupId>
13 <artifactId>maven-shade-plugin</artifactId>
14 <version>3.5.3</version>
15 <configuration>
16 <!-- put your configurations here -->
17 </configuration>
18 <executions>
19 <execution>
20 <phase>package</phase>
21 <goals>
22 <goal>shade</goal>
23 </goals>
24 </execution>
25 </executions>
26 </plugin>
27 </plugins>
28 </build>
29
30 <dependencies>
31 <!-- Java MongoDB Driver dependency -->
32 <!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
33 <dependency>
34 <groupId>org.mongodb</groupId>
35 <artifactId>mongodb-driver-sync</artifactId>
36 <version>5.1.0</version>
37 </dependency>
38 <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
39 <dependency>
40 <groupId>com.amazonaws</groupId>
41 <artifactId>aws-java-sdk</artifactId>
42 <version>1.12.723</version>
43 </dependency>
44
45 <!-- slf4j logging dependency, required for logging output from the MongoDB Java Driver -->
46 <dependency>
47 <groupId>org.slf4j</groupId>
48 <artifactId>slf4j-jdk14</artifactId>
49 <version>1.7.28</version>
50 </dependency>
51
52 <dependency>
53 <groupId>kafka-connect</groupId>
54 <artifactId>kafka-connect</artifactId>
55 <scope>system</scope>
56 <version>1.13.0</version>
57 </dependency>
58 </dependencies>
59
60 <properties>
61 <maven.compiler.source>17</maven.compiler.source>
62 <maven.compiler.target>17</maven.compiler.target>
63 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
64 </properties>
65
66</project>
Note: We need to reference MongoDB Connector for Apache Kafka version 1.13 or later. In this example, the connector was downloaded from the Kafka Connector GitHub repository and stored in the local Maven repository.
1mvn install:install-file -Dfile=/Users/robert.walters/Downloads/mongodb-kafka-connect-mongodb-1.13.0/lib/mongo-kafka-connect-1.13.0-confluent.jar -DgroupId=kafka-connect -DartifactId=kafka-connector -Dversion=1.13.0 -Dpackaging=jar\n
Compile and place this SampleAssumeRole.jar file in the Kafka libs directory in your Kafka Connect deployment. You are now ready to configure your MongoDB Connector for Apache Kafka to use this custom authentication provider.

Configure the MongoDB Connector for Apache Kafka

To support the custom provider in version 1.13, the following configuration options were added to the connector:
Configuration ParameterDescrição
mongo.custom.auth.mechanism.enableDetermines if the connector should use the provider class; default is false
mongo.custom.auth.mechanism.providerClassJava class entry point for the custom provider
mongodbaws.auth.mechanism.roleArnRole that has IAM access to the database
In this example, the custom authentication component will use AWS IAM AssumeRole to authenticate with MongoDB Atlas. Here is an example of this configuration connecting to a MongoDB Atlas cluster.
1{
2 "name": "mongo-tutorial-sink",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
5 "topics": "Stocks2",
6 "connection.uri": "<MongoDB Connection String>",
7 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
8 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
9 "value.converter.schemas.enable": false,
10 "database": "Stocks",
11 "collection": "StockData",
12 "mongo.custom.auth.mechanism.enable":"true",
13 "mongo.custom.auth.mechanism.providerClass":"com.mongodb.SampleAssumeRoleCredential",
14 "mongodbaws.auth.mechanism.roleArn":"<AWS IAM ARN>"
15 }
16}
Notice that the roleArn is the ARN (Amazon Resource Name) of the AWS IAM role that is used for Atlas users, KafkaBlogAtlasRole. At runtime, since Kafka Connect is running under the AWS IAM role of the EC2 instance, KafkaConnectBlogRole, and this role has a policy that allows AssumeRole on the KafkaBlogAtlasRole, the service can connect to Atlas without storing any credentials!
You are now ready to send data to Kafka and watch it arrive in a MongoDB collection using the MongoDB Connector for Apache Kafka configured with AWS IAM authentication to MongoDB Atlas.

Resumo

In this article, we discussed the requirements needed to support a custom authentication provider such as AWS IAM authentication. Version 1.13 of the Kafka Connector introduced new parameters that define which class to use within the custom provider JAR file. This JAR file includes the logic for the custom authentication.
Note that while adding AWS IAM authentication capability natively to the Kafka Connector is possible (it is just software, after all), doing so would not only add a version dependency on the AWS IAM SDK, but it would also significantly increase the size of the connector, making it impractical for all other users who do not wish to use this authentication method. For these reasons, writing the custom credential provider is the solution for integrating the Kafka Connector with authentication methods such as AWS IAM.
Read the MongoDB Connector for Apache Kafka documentation to learn more. Questions? Join us in the MongoDB Developer Community.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.

Facebook Icontwitter iconlinkedin icon
Avaliar este tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Dominando o MongoDB Ops Manager no Kubernetes


Jan 13, 2023 | 7 min read
Tutorial

How to Get Started with MongoDB Atlas Stream Processing and the HashiCorp Terraform MongoDB Atlas Provider


May 20, 2024 | 5 min read
Tutorial

Vá para o MongoDB usando conectores Kafka - Guia final do agente


Sep 17, 2024 | 7 min read
Artigo

Saiba como aproveitar os dados do MongoDB dentro do Kafka com novos tutoriais!


Sep 17, 2024 | 1 min read