Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to insert 4 million records from Oracle to Elasticsearch table faster using C#?

I have the following code written in C# but according to that, it would take me 4-5 days to migrate the data from Oracle database to Elasticsearch. I am inserting the records in batches of 100. Is there any other way that the migration of the 4 million records takes place faster (probably in less than a day, if possible)?

   public static void Selection()
        {
            for(int i = 1; i < 4000000; i += 1000)
            {
                for(int j = i; j < (i+1000); j += 100)
                {
                    OracleCommand cmd = new OracleCommand(BuildQuery(j), 
                                                     oracle_connection);
                    OracleDataReader reader = cmd.ExecuteReader();
                    List<Record> list=CreateRecordList(reader);
                    insert(list);
                }
            }
        }

   private static List<Record> CreateRecordList(OracleDataReader reader)
        {
            List<Record> l = new List<Record>();
            string[] str = new string[7];
            try
            {
                while (reader.Read())
                {
                    for (int i = 0; i < 7; i++)
                    {
                        str[i] = reader[i].ToString();
                    }

                    Record r = new Record(str[0], str[1], str[2], str[3],                              
                                str[4], str[5], str[6]);
                    l.Add(r);
                }
            }
            catch (Exception er)
            {
                string msg = er.Message;
            }
            return l;
        }

   private static string BuildQuery(int from)
        {
            int to = from + change - 1;
            StringBuilder builder = new StringBuilder();
            builder.AppendLine(@"select * from");
            builder.AppendLine("(");
            builder.AppendLine("select FIELD_1, FIELD_2, 
            FIELD_3, FIELD_4, FIELD_5, FIELD_6, 
            FIELD_7, ");
            builder.Append(" row_number() over(order by FIELD_1) 
             rn");
            builder.AppendLine("   from tablename");
            builder.AppendLine(")");
            builder.AppendLine(string.Format("where rn between {0} and {1}", 
            from, to));
            builder.AppendLine("order by rn");
            return builder.ToString();
        }

   public static void insert(List<Record> l)
        {
            try
            {
                foreach(Record r in l)
                    client.Index<Record>(r, "index", "type");
            }
            catch (Exception er)
            {
                string msg = er.Message;
            }
        }
like image 686
Aakriti Mittal Avatar asked Jun 24 '15 13:06

Aakriti Mittal


2 Answers

The ROW_NUMBER() function is going to negatively impact performance, and you're running it thousands of times. You're already using an OracleDataReader -- it will not pull all four million rows to your machine at once, it's basically streaming them one or a few at a time.

This has to be doable in minutes or hours, not days -- we have several processes that move millions of records between a Sybase and SQL server in a similar manner and it takes less than five minutes.

Maybe give this a shot:

OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection);
int batchSize = 500;    
using (OracleDataReader reader = cmd.ExecuteReader())
{
    List<Record> l = new List<Record>(batchSize);
    string[] str = new string[7];
    int currentRow = 0;

    while (reader.Read())
    {
        for (int i = 0; i < 7; i++)
        {
            str[i] = reader[i].ToString();
        }

        l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6]));

        // Commit every time batchSize records have been read
        if (++currentRow == batchSize)
        {
            Commit(l);
            l.Clear();
            currentRow = 0;
        }
    }

    // commit remaining records
    Commit(l);
}

Here's what Commit might look like:

public void Commit(IEnumerable<Record> records)
{
    // TODO: Use ES's BULK features, I don't know the exact syntax

    client.IndexMany<Record>(records, "index", "type");
    // client.Bulk(b => b.IndexMany(records))... something like this
}
like image 171
Cᴏʀʏ Avatar answered Oct 07 '22 11:10

Cᴏʀʏ


But you are not inserting in batches of 100
In the end you are inserting one at a time
(and that may not even be the correct code to insert one)

foreach(Record r in l)
  client.Index<Record>(r, "index", "type");

All those girations on read do nothing if the insert is one row at a time
You are just introducing lag while you get the the next batch
Read is (almost) always faster than write

OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection);
OracleDataReader reader = cmd.ExecuteReader();
while (reader.Read())
{
   client.Index<Record>(new Record(reader.GetSting(0),   
                        reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),    
                        reader.GetSting(4), reader.GetSting(5), reader.GetSting(6),  
                        "index", "type");
}
reader.Close();

You could use a BlockingCollection if you want to read and write in parallel
But use a max size to read does not get too far ahead of write

like image 37
paparazzo Avatar answered Oct 07 '22 10:10

paparazzo