Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the most efficient way of moving data out of Hive and into MongoDB?

Is there an elegant, easy and fast way to move data out of Hive into MongoDB?

like image 579
Alex N. Avatar asked Sep 09 '12 17:09

Alex N.


People also ask

Why MongoDB is more preferable for big data?

It requires lesser input and output operations due to embedded data models, unlike relational databases. MongoDB indexes also support faster queries. It provides fault tolerance by creating replica datasets. Replication ensures data is stored on multiple servers, creating redundancy, and ensuring high availability.

Can MongoDB handle petabytes of data?

The software offers extreme scale, rapid recovery, and smart storage optimization. Cohesity is being used by leading Fortune 500 businesses in the retail, financial services and healthcare industries. Key Features • Extremely Scalable uses a scale-out architecture that can support a few terabytes to petabytes of data.

Is MongoDB good for data warehouse?

Easily load your MongoDB data into your data warehouseMongoDB is a robust database used for high volume data storage. It is a document-oriented NoSQL database, and that makes it really easy to store and work with no relational data.

Is MongoDB good for large data?

Because of these distinctive requirements, NoSQL (non-relational) databases, such as MongoDB, are a powerful choice for storing big data.


3 Answers

You can do the export with the Hadoop-MongoDB connector. Just run the Hive query in your job's main method. This output will then be used by the Mapper in order to insert the data into MongoDB.

Example:

Here I'm inserting a semicolon separated text file (id;firstname;lastname) to a MongoDB collection using a simple Hive query :

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class HiveToMongo extends Configured implements Tool {

    private static class HiveToMongoMapper extends
            Mapper<LongWritable, Text, IntWritable, BSONWritable> {

        //See: https://issues.apache.org/jira/browse/HIVE-634
        private static final String HIVE_EXPORT_DELIMETER = '\001' + "";
        private IntWritable k = new IntWritable();
        private BSONWritable v = null;

        @Override
        public void map(LongWritable key, Text value, Context context) 
          throws IOException, InterruptedException {

            String [] split = value.toString().split(HIVE_EXPORT_DELIMETER);

            k.set(Integer.parseInt(split[0]));
            v = new BSONWritable();
            v.put("firstname", split[1]);
            v.put("lastname", split[2]);
            context.write(k, v);

        }
    }

    public static void main(String[] args) throws Exception {
        try {
            Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
        }
        catch (ClassNotFoundException e) {
            System.out.println("Unable to load Hive Driver");
            System.exit(1);
        }

        try {
            Connection con = DriverManager.getConnection(
                "jdbc:hive://localhost:10000/default");

            Statement stmt = con.createStatement();    
            String sql = "INSERT OVERWRITE DIRECTORY " +
                    "'hdfs://localhost:8020/user/hive/tmp' select * from users";
            stmt.executeQuery(sql);

        }
        catch (SQLException e) {
            System.exit(1);
        }

        int res = ToolRunner.run(new Configuration(), new HiveToMongo(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Path inputPath = new Path("/user/hive/tmp");
        String mongoDbPath = "mongodb://127.0.0.1:6900/mongo_users.mycoll";
        MongoConfigUtil.setOutputURI(conf, mongoDbPath);

        /*
        Add dependencies to distributed cache via 
        DistributedCache.addFileToClassPath(...) :
        - mongo-hadoop-core-x.x.x.jar
        - mongo-java-driver-x.x.x.jar
        - hive-jdbc-x.x.x.jar
        HadoopUtils is an own utility class
        */
        HadoopUtils.addDependenciesToDistributedCache("/libs/mongodb", conf);
        HadoopUtils.addDependenciesToDistributedCache("/libs/hive", conf);

        Job job = new Job(conf, "HiveToMongo");

        FileInputFormat.setInputPaths(job, inputPath);
        job.setJarByClass(HiveToMongo.class);
        job.setMapperClass(HiveToMongoMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);

        job.submit();
        System.out.println("Job submitted.");
        return 0;
    }
}

One drawback is that a 'staging area' (/user/hive/tmp) is needed to store the intermediate Hive output. Furthermore as far as I know the Mongo-Hadoop connector doesn't support upserts.

I'm not quite sure but you can also try to fetch the data from Hive without running hiveserver which exposes a Thrift service so that you can probably save some overhead. Look at the source code of Hive's org.apache.hadoop.hive.cli.CliDriver#processLine(String line, boolean allowInterupting) method which actually executes the query. Then you can hack together something like this:

...
LogUtils.initHiveLog4j();
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.err = new PrintStream(System.err, true, "UTF-8");
SessionState.start(ss);

Driver qp = new Driver();
processLocalCmd("SELECT * from users", qp, ss); //taken from CliDriver
...

Side notes:

There's also a hive-mongo connector implementation you might also check. It's also worth having a look at the implementation of the Hive-HBase connector to get some idea if you want to implement a similar one for MongoDB.

like image 125
Lorand Bendig Avatar answered Nov 11 '22 23:11

Lorand Bendig


Have you looked into Sqoop? It's supposed to make it very simple to move data between Hadoop and SQL/NoSQL databases. This article also gives an example of using it with Hive.

like image 28
HypnoticSheep Avatar answered Nov 11 '22 22:11

HypnoticSheep


Take a look at the hadoop-MongoDB connector project :

http://api.mongodb.org/hadoop/MongoDB%2BHadoop+Connector.html

"This connectivity takes the form of allowing both reading MongoDB data into Hadoop (for use in MapReduce jobs as well as other components of the Hadoop ecosystem), as well as writing the results of Hadoop jobs out to MongoDB."

not sure if it will work for your use case but it's worth looking at.

like image 1
Jean-Philippe Bond Avatar answered Nov 11 '22 23:11

Jean-Philippe Bond