Apache Kafka Connections
On this page
- Add a Kafka Connection
- Add a Kafka Private Link Connection
- Amazon Web Services Confluent and MSK Private Link Connections
- Microsoft Azure EventHub Private Link Connections
- Microsoft Azure Confluent Private Link Connections
- Add a Kafka Transit Gateway Connection
- Provision a Transit Gateway network in Confluent Cloud.
- Create an AWS Resource Share.
- Configure a connection between your Confluent Cloud and your Transit Gateway.
- Configure a connection between your Transit Gateway and Atlas
- Create a Kafka Transit Gateway connection.
- Configuration
Atlas Stream Processing supports both source and sink connections to Apache Kafka.
Add a Kafka Connection
To add a Kafka connection to your stream processing instance's connection registry:
To create one connection for the stream processing instance you specify using the Atlas CLI, run the following command:
atlas streams connections create [connectionName] [options]
To learn more about the command syntax and parameters, see the Atlas CLI documentation for atlas streams connections create.
In Atlas, go to the Stream Processing page for your project.
Warning
Navigation Improvements In Progress
We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.
If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.
If it's not already displayed, select your project from the Projects menu in the navigation bar.
In the sidebar, click Stream Processing under the Services heading.
The Stream Processing page displays.
Add a new connection.
Select a Kafka connection.
Provide a Connection Name. Each connection name must be unique within a stream processing instance. This is the name used to reference the connection in Atlas Stream Processing aggregations.
Select a Network Access type. Atlas Stream Processing supports Public IP or VPC Peering connections.
Click the Public IP button. No further configuration is needed for this network access type.
Click the VPC Peering button.
Toggle Enable VPC Peering on. Atlas Stream Processing automatically selects the appropriate VPC peering connection from your configured connections.
If you do not have a VPC peering connection, Configure an Atlas Network Peering Connection.
Specify an IP address for one or more bootstrap servers for your Apache Kafka system.
From the dropdown menu, select a Security Protocol Method.
Atlas Stream Processing supports
SASL_PLAINTEXT
orSASL_SSL
.SASL_PLAINTEXT
is incompatible with VPC peering. To use VPC peering, you must select theSASL_SSL
method.From the dropdown menu, select a SASL Mechanism.
Atlas Stream Processing supports:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
Provide a Username for authentication.
Provide a password for authentication.
Click Add connection.
From the dropdown menu, select a SASL Mechanism.
Atlas Stream Processing supports:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
Click Upload to upload your Certificate Authority PEM file
Provide a Username for authentication.
Provide a password for authentication.
Click Add connection.
The Atlas Administration API provides an endpoint for adding a connection to a connection registry.
Important
After adding an external connection such as an Apache Kafka cluster to your connection registry, you must add Atlas IP addresses to an access list for that external connection. For more information, see Allow Access to or from the Atlas Control Plane.
Add a Kafka Private Link Connection
Atlas Stream Processing currently supports creating AWS Private Link connections to the following:
AWS Confluent clusters
AWS MSK clusters
Microsoft Azure EventHub
Amazon Web Services Confluent and MSK Private Link Connections
To create an AWS Private Link connection to use in your Atlas Stream Processing project:
Important
You can't have more than one Private Link connection to a given Confluent cluster per Atlas project. Before you begin this procedure, call the Return All Private Link Connections endpoint. If you have an existing Private Link connection to your Confluent cluster within Atlas but not within your Confluent account, only perform those steps that configure your Confluent-side networking.
Configure your Confluent cluster.
You must configure your Confluent cluster to accept incoming connections from your Atlas project.
Important
Confluent accepts incoming connections only from AWS. To use a Confluent Private Link connection, you must host your stream processing instances on AWS.
Call the Return Account ID and VPC ID for group and region Atlas Administration API endpoint. Note the value of
awsAccountId
, you will need this in a later step.In your Confluent account, navigate to the cluster you want to connect to. In your cluster networking interface, navigate to your cluster networking details.
For a Confluent dedicated cluster, provide a name of your
choice. For the AWS account number, provide the value of
the awsAccountId
field you noted earlier.
Note
This step is not required for Confluent serverless clusters.
Request a connection to your cloud provider.
The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.
For an AWS Confluent Private Link connection, you must set the following key-value pairs:
Key | Value |
---|---|
| Your Confluent cluster's VPC Endpoint service name. |
| Fully qualified domain name of the bootstrap server on your Confluent cluster. |
| If your cluster doesn't use subdomains, you must set this to
the empty array |
You can find these values in your Confluent cluster's networking details.
The following example command requests a connection to your Confluent cluster and illustrates a typical response:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "vendor": "Confluent", "provider": "AWS", "region": "us_east_1", "serviceEndpointId": "com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9", "dnsDomain": "sample.us-east-1.aws.confluent.cloud", "dnsSubDomain: [ "use1-az1.sample.us-east-1.aws.confluent.cloud", "use1-az2.sample.us-east-1.aws.confluent.cloud", "use1-az4.sample.us-east-1.aws.confluent.cloud" ] }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.us-east-1.aws.confluent.cloud.","vendor":"Confluent","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}
After you send the request, note the value of the _id
field in the response body. You will need this in a later step.
Provide the interface endpoint ID to Confluent.
Note
This step applies only to Confluent serverless clusters.
Call the Return All Private Link Connections endpoint. Note
the value of interfaceEndpointId
.
In your Confluent account, navigate to the cluster you want to
connect to. In your cluster networking interface, navigate to
your cluster networking details. Navigate to the access points
interface, and add a new access point. When Confluent prompts
you for an interface endpoint, provide the value of
interfaceEndpointId
that you noted previously.
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| The password associated with your Confluent API key |
| The username associated with your Confluent API key |
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates a Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "confluent_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.aws.confluent.cloud:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "apiSecretDemo", "username": "apiUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Configure your AWS MSK cluster.
You must configure your AWS MSK cluster to accept incoming connections from your Atlas project.
Important
AWS MSK accepts incoming connections from AWS only. To use an AWS MSK Private Link connection, you must host your stream processing instances on AWS.
Use the Get Account Details endpoint to retrieve the AWS Principal identity. You will need this value for your AWS MSK cluster policy.
Sign in to the AWS Management Console and navigate to the AWS MSK console. Ensure that
multi-VPC connectivity
is enabled on the cluster to which you want to connect.Click Properties, Security Settings, and Edit cluster policy.
Provide the full ARN form of the Principal identity you retrieved earlier as the value of
Statement.Principal.Aws.[]
and ensure the policy takes the following form:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam:123456789012:root" ] }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/testing/de8982fa-8222-4e87-8b20-9bf3cdfa1521-2" } ] }
Request a connection to your cloud provider.
The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.
For an AWS MSK Private Link connection, you must set the following key-value pairs:
Key | Value |
---|---|
| Must be set to |
| Must be set to |
| String representing the Amazon Resource Number of your AWS MSK cluster. |
You can find the ARN in your AWS MSK cluster's networking details.
The following example command requests a connection to your AWS MSK cluster and illustrates a typical response:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "vendor": "msk", "provider": "AWS", "arn": "1235711"}'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"scram.sample.us-east-1.amazonaws.com","vendor":"msk","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}
After you send the request, note the value of the _id
field
in the response body. You will need this in a later step.
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| The SCRAM password associated with your cluster. You must define a paired SCRAM user and password and associate it with your AWS MSK cluster using AWS Secrets Manager. |
| The SCRAM user associated with your cluster. You must define a paired SCRAM user and password and associate it with your AWS MSK cluster using AWS Secrets Manager. |
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates a Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "msk_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.amazonaws.com:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "scramSecretDemo", "username": "scramUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Microsoft Azure EventHub Private Link Connections
To create an Microsoft Azure EventHub Private Link connection to use in your Atlas Stream Processing project:
In Atlas, go to the Network Access page for your project.
Warning
Navigation Improvements In Progress
We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.
If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.
If it's not already displayed, select your project from the Projects menu in the navigation bar.
In the sidebar, click Network Access under the Security heading.
The Network Access page displays.
Provide your Azure EventHub endpoint details.
Provide your Azure service endpoint ID.
Select your Endpoint region.
Select your Host name.
Click Next, generate endpoint ID
You may now view your Azure EventHub private endpoint in the Network Access interface under the Atlas Stream Processing tab by clicking the View button in its row.
Request a connection to your cloud provider.
The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.
For an Azure Private Link connection, you must set the following key-value pairs:
Key | Value |
---|---|
| Your EventHub namespace endpoint. Note that this value must be the Azure Resource Manager (ARM) ID of the Event Hub namespace, not the ARM ID of an individual Event Hub. |
| Fully qualified domain name, with port number, of the bootstrap server in your Azure Event Hub namespace. This domain name conforms to the format described here. |
The following example command requests a connection to your Azure Event Hub and illustrates a typical response:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "provider": "AZURE", "region": "US_EAST_2", "serviceEndpointId": "/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample", "dnsDomain": "sample.servicebus.windows.net" }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.servicebus.windows.net","provider":"AZURE","region":"US_EAST_2","serviceEndpointId":"/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample"}
After you send the request, note the value of the _id
field
in the response body. You will need this in a later step.
Accept the requested connection within your cloud provider account.
For Private Link connections to Azure, navigate to your Event Hub networking page and select the Private endpoint connections tab. In the table of connections, identify your newly requested connection and approve it.
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| Your Event Hub connection string |
|
|
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates a Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "eventhubpl33333", "bootstrapServers": "sample.servicebus.windows.net:9093", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "Endpoint=sb://sample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Irlo3OoRkc27T3ZoGOlbhEOqXQRXzb12+Q2hNXm0lc=", "username": "$ConnectionString" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Microsoft Azure Confluent Private Link Connections
To create an Microsoft Azure Confluent Private Link connection to use in your Atlas Stream Processing project:
Configure Confluent cluster.
Call the streams/accountDetails
endpoint to get your Atlas
project's Azure subscription ID:
curl --location 'http://cloud.mongodb.com/api/atlas/v2/groups/<project_id>/streams/accountDetails?cloudProvider=azure®ionName=<region>' \ --header 'Accept: application/vnd.atlas.2024-11-13+json' { "azureSubscriptionId": "f1a2b3c4-d5e6-87a8-a9b0-c1d2e3f4a5b6", "cidrBlock": "192.168.123.0/21", "virtualNetworkName": "vnet_a1b2c3d4e5f6a7b8c9d0e1f2_xyz987ab", "cloudProvider": "azure" }
Add PrivateLink Acess.
Follow the procedure provided in the Confluent documentation to add PrivateLink access.
Note
You need to provide your azureSubscriptionId
.
Request a connection to your cloud provider.
Key | Value |
---|---|
region | Region of the Confluent cluster |
dnsDomain | The DNS domain of your cluster's network.
Eg: |
azureResourceIds | The resource ID for the Confluent Cloud Private Link service endpoint in each Availability Zone (AZ) used by your cluster's network.
|
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2024-11-13+json' \ --data '{ "vendor": "Confluent", "provider": "Azure", "region": "US_EAST_2", "dnsDomain": "abcxyz12345.eastus2.azure.confluent.cloud", "azureResourceIds: [ "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-1", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-2", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-3" ] }'
{ "_id": "65f8a3b4c5d6e7f8a9b0c1d2", "azureResourceIds": [ "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-1", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-2", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-3" ], "dnsDomain": "abcxyz12345.eastus2.azure.confluent.cloud", "provider": "Azure", "region": "US_EAST_2", "vendor": "Confluent" }
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| The password associated with your Confluent API key |
| The username associated with your Confluent API key |
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates an Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "confluent_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.aws.confluent.cloud:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "apiSecretDemo", "username": "apiUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Add a Kafka Transit Gateway Connection
To create an AWS Transit Gateway connection to use in your Atlas Stream Processing project:
Provision a Transit Gateway network in Confluent Cloud.
Follow the instructions described in the Confluent Cloud documentation for either the /27 Peering & Transit Gateway or the /16 Peering & Transit Gateway connectivity type.
Important
Ensure that your Confluent Cloud CIDR does not overlap with your Atlas VPC CIDR. Retrieve your Atlas VPC CIDR with the Get Account Details endpoint.
Create an AWS Resource Share.
Follow the instructions described in the AWS documentation with the following specific parameters:
Set Resources - optional to Transit Gateways.
Set Select principal type to AWS account.
- Set Enter an AWS account ID to your
- Confluent Cloud AWS account ID from your Confluent Management Console.
Configure a connection between your Confluent Cloud and your Transit Gateway.
In your Confluent Console, navigate to the Ingress connections pane.
Click + Transit Gateway.
Populate the fields of the Transit Gateway configuration.
Ensure you set AWS VPC CIDR to the value of your Atlas VPC CIDR.
In your AWS console, accept the incoming Transit Gateway attachment request from your Confluent Cloud VPC.
Configure a connection between your Transit Gateway and Atlas
Retrieve your Atlas account details with the Get Account Details endpoint. Note your
awsAccountId
,cidrBlock
, andvpcId
for later.Create an AWS Resource Share.
Follow the instructions described in the AWS documentation with the following specific parameters:
Set Resources - optional to Transit Gateways.
Set Select principal type to AWS account.
- Set Enter an AWS account ID to your
- AWS Atlas account ID that you retrieved earlier.
Accept the Transit Gateway resource share invitation using the
Accept Transit Gateway Resource Share Invitation
endpoint.Create a Transit Gateway attachment from your Atlas VPC using the
Create Transit Gateway Attachment
endpoint.Set
atlasVpcId
to thevpcId
that you retrieved earlier. Note thetgwAttachmentId
for later.Accept the Transit Gateway attachment.
Follow the instructions described in the AWS documentation.
Create a Kafka Transit Gateway connection.
Call the Create One Connection endpoint with the following parameters:
Set
networking.access.tgwId
to the AWS ID of your Transit Gateway.Set
networking.access.vpcCIDR
to the Atlas CIDR that you retrieved earlier.Set
networking.access.type
toTRANSIT_GATEWAY
.Set
security.protocol
toSASL_SSL
.Set
type
toKafka
.
Configuration
Each interface for creating a Kafka connection allows you to provide configuration parameters for your Kafka cluster. These configurations take the form of key-value pairs, and correspond to one of the following:
Atlas Stream Processing passes only these parameters to your Kafka cluster. If you declare any parameters not explicitly allowed, Atlas Stream Processing ignores them.
Interface | Configuration Mechanism |
---|---|
Atlas CLI | Provide configurations as a |
Atlas Administration API | Provide configurations as a |
Atlas UI | Provide configurations in the Configuration File field of the Add Connection page. |
Atlas Stream Processing supports the following configurations: