MongoTimedOutException

Getting ‘MongoTimedOutException’ while trying to write data (sink) to MondoDB (Hosted on AWS EC2) from Amazon Managed Flink Application using Flink MongoDB connector.

java.io.IOException: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@a3d35db. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:224)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.write(MongoWriter.java:161)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.io.IOException: Writing records to MongoDB failed.
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.close(MongoWriter.java:189)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:234)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:223)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.close(SinkWriterOperator.java:219)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:970)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
… 3 more
Caused by: java.io.IOException: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@462bda1c. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:224)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.close(MongoWriter.java:186)
… 17 more
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@462bda1c. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:425)
at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:122)
at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:50)
at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:146)
at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:101)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:291)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:207)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:448)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:428)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:423)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:216)
… 18 more
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@a3d35db. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:425)
at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:122)
at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:50)
at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:146)
at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:101)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:291)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:207)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:448)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:428)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:423)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:216)
… 15 more

Ensure that your MongoDB instance is accessible from the network where your Flink application is running. This includes checking security groups, network ACLs, and firewall settings to allow traffic on the necessary ports (default is 27017 for MongoDB). Make sure that the MongoDB service is up and running on your EC2 instance. You can do this by connecting to your EC2 instance via SSH and using the mongo shell or other MongoDB management tools.