Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

Custom Authentication Provider

On this page

  • Overview
  • AWS IAM Authentication Example

You can add a custom authentication provider by implementing the com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider interface. You must place your custom class JAR file in the lib folder in your Kafka Connect deployment.

Set following authentication properties to configure the authentication provider:

  • mongo.custom.auth.mechanism.enable: set to true

  • mongo.custom.auth.mechanism.providerClass: set to the qualified class name of the implementation class

  • (Optional) mongodbaws.auth.mechanism.roleArn: set to an Amazon Resource Name (ARN)

This example provides a custom authentication provider that supports AWS IAM. The following code shows the custom authentication provider JAR file:

package com.mongodb;
import java.util.Map;
import java.util.function.Supplier;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.util.StringUtils;
public class SampleAssumeRoleCredential implements CustomCredentialProvider {
public SampleAssumeRoleCredential() {}
@Override
public MongoCredential getCustomCredential(Map<?, ?> map) {
AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();
Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
.withCredentials(provider)
.withRegion("us-east-1")
.build();
AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
.withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))
.withRoleSessionName("Test_Session");
AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
Credentials creds = assumeRoleResult.getCredentials();
// Add your code to fetch new credentials
return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());
};
return MongoCredential.createAwsCredential(null, null)
.withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
}
// Validates presence of an ARN
@Override
public void validate(Map<?, ?> map) {
String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");
if (StringUtils.isNullOrEmpty(roleArn)) {
throw new RuntimeException("Invalid value set for customProperty");
}
}
// Initializes the custom provider
@Override
public void init(Map<?, ?> map) {
}
}

Compile the JAR file and place it in the lib folder in your deployment.

Note

To view an example of a pom.xml file that can build the complete JAR containing the implementation class, see the Kafka Connector GitHub repository README file.

Next, configure your source or sink connector to include the custom authentication method. The following configuration properties define a sink connector that connects the Kafka Connector to MongoDB Atlas by using AWS IAM authentication:

{
"name": "mongo-tutorial-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "<topic>",
"connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "<db>",
"collection": "<collection>",
"mongo.custom.auth.mechanism.enable": "true",
"mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential",
"mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>"
}
}

In this example, the roleArn value is the IAM Role of the user group that has access to MongoDB Atlas. In the AWS IAM console, the IAM account that is running Kafka Connect has AssumeRole permissions to the Atlas User Group.

Back

MongoDB AWS-based Authentication