Getting ‘MongoTimedOutException’ while trying to write data (sink) to MondoDB (Hosted on AWS EC2) from Amazon Managed Flink Application using Flink MongoDB connector.
java.io.IOException: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@a3d35db. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:224)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.write(MongoWriter.java:161)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.io.IOException: Writing records to MongoDB failed.
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.close(MongoWriter.java:189)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:234)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:223)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.close(SinkWriterOperator.java:219)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:970)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
… 3 more
Caused by: java.io.IOException: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@462bda1c. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:224)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.close(MongoWriter.java:186)
… 17 more
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@462bda1c. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:425)
at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:122)
at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:50)
at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:146)
at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:101)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:291)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:207)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:448)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:428)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:423)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:216)
… 18 more
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@a3d35db. Client view of cluster state is {type=REPLICA_SET, servers=[{address:1026=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1025=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}, {address:1024=pl-0-us-east-1.q7hxs.mongodb.net, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.SocketTimeoutException: connect timed out}}]
at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:425)
at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:122)
at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:50)
at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:146)
at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:101)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:291)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:207)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:448)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:428)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:423)
at org.apache.flink.connector.mongodb.sink.writer.MongoWriter.doBulkWrite(MongoWriter.java:216)
… 15 more