Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"The $changeStream stage is only supported on replica sets" error while using mongodb-source-connect

Happy new year

I am here because i faced an error while running kafka-mongodb-source-connect I was trying to run connect-standalone with connect-avro-standalone.properties and MongoSourceConnector.properties so that Connect write data which is written in mongodb to kafka topic.

While trying that progress i faced this error and i couldn't find answer so i am writing here.

This is what i wanted to do

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/mongodb-kafka-connect-mongodb/etc/MongoSourceConnector.properties

And this is connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java,/Users/anton/Downloads/confluent-5.3.2/share/confluent-hub-components

This is MongoSourceConnecor.properties

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=test

This is a main error i got

[2020-01-02 18:55:11,546] ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573): 'The $changeStream stage is only supported on replica sets' on server localhost:27017. The full response is {"ok": 0.0, "errmsg": "The $changeStream stage is only supported on replica sets", "code": 40573, "codeName": "Location40573"}
like image 707
Jaeho Lee Avatar asked Jan 03 '20 00:01

Jaeho Lee


People also ask

What is a replica set in MongoDB?

A replica set in MongoDB is a group of mongod processes that maintain the same data set. Replica sets provide redundancy and high availability, and are the basis for all production deployments. This section introduces replication in MongoDB as well as the components and architecture of replica sets.

Which command can be used start a MongoDB process as part of a replica set?

Now that the first server is added to the replica set, the next step is to initiate the replica set by issuing the following command rs. initiate (). Verify the replica set by issuing the command rs. conf() to ensure the replica set up properly.

Does MongoDB support replication?

Replication stores multiple data copies across various data servers, allowing users to recover data whenever required. MongoDB supports replication with the help of Replica Sets. These Replica Sets are a combination of various MongoDB instances, each having a single primary node and multiple secondary nodes.


2 Answers

MongoDB change streams option is available only in replica sets setup. However, you can update your standalone installation to a single node replica set by following the below steps.

  1. Locate the mongodb.conf file and add the replica set details

Add the following replica set details to mongodb.conf file

replication:
  replSetName: "<replica-set name>"

Example

replication: replSetName: "rs0"

Note: Location in brew installed MongoDB /usr/local/etc/mongod.conf

  1. Initiate the replica set using rs.initiate()

Login to the MongoDB shell and run command rs.initiate() this will start your replica set. Logs look like the following on successful start

> rs.initiate()
{
    "info2" : "no configuration specified. Using a default configuration for the set",
    "me" : "127.0.0.1:27017",
    "ok" : 1,
    "$clusterTime" : {
        "clusterTime" : Timestamp(1577545731, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    },
    "operationTime" : Timestamp(1577545731, 1)
}

That's all with these two simple steps you are running a MongoDB replica set with one node only.

Reference: https://onecompiler.com/posts/3vchuyxuh/enabling-replica-set-in-mongodb-with-just-one-node

like image 158
Uche Azinge Avatar answered Sep 25 '22 12:09

Uche Azinge


stage is only supported on replica sets

You need to make your Mongo database a replica set in order to read the oplog

https://dba.stackexchange.com/questions/243780/converting-mongodb-instance-from-standalone-to-replica-set-and-backing-up

like image 44
OneCricketeer Avatar answered Sep 21 '22 12:09

OneCricketeer