I have around 60GB of JSON files that I am parsing using Python and then inserting into a MySQL database using the Python-MySQL Connector. Each JSON file is around 500MB
I have been using an AWS r3.xlarge EC2 instance with a secondary volume to hold the 60GB of JSON data.
I am then using an AWS RDS r3.xlarge MySQL instance. These instances are all in the same region and availability zone. The EC2 instance is using the following Python script to load the JSON, parse it and then insert it into the MySQL RDS. My python:
import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os
os.chdir("./json_data")
for file in glob.glob("*.json"):
with open(file, 'rU') as data_file:
results = json.load(data_file)
print('working on file:', file)
cnx = mysql.connector.connect(user='', password='',
host='')
cursor = cnx.cursor(buffered=True)
DB_NAME = 'DB'
def create_database(cursor):
try:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
try:
cnx.database = DB_NAME
except mysql.connector.Error as err:
if err.errno == errorcode.ER_BAD_DB_ERROR:
create_database(cursor)
cnx.database = DB_NAME
else:
print(err)
exit(1)
add_overall_data = ("INSERT INTO master"
"(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
"VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")
add_polyline = ("INSERT INTO polyline"
"(Overview_polyline, request_no)"
"VALUES (%(Overview_polyline)s, %(request_no)s)")
add_summary = ("INSERT INTO summary"
"(summary, request_no)"
"VALUES (%(summary)s, %(request_no)s)")
add_warnings = ("INSERT INTO warnings"
"(warnings, request_no)"
"VALUES (%(warnings)s, %(request_no)s)")
add_waypoint_order = ("INSERT INTO waypoint_order"
"(waypoint_order, request_no)"
"VALUES (%(waypoint_order)s, %(request_no)s)")
add_leg_data = ("INSERT INTO leg_data"
"(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)"
"VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
error_messages = []
for result in results:
if result["status"] == "OK":
for leg in result['routes'][0]['legs']:
try:
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": leg['dtf']['value'],
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": leg['start_address'],
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": leg['end_address']
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
except KeyError, e:
error_messages.append(e)
params = {
"_sent_time_stamp": leg['_sent_time_stamp'],
"dt": leg['dt']['value'],
"ds": leg['ds']['value'],
"dtf": "000",
"O_l": leg['start_location']['lat'],
"O_ln": leg['start_location']['lng'],
"O_Ls": leg['O_Ls'],
"O_a": 'unknown',
"D_l": leg['end_location']['lat'],
"D_ln": leg['end_location']['lng'],
"d_a": 'unknown'
}
cursor.execute(add_overall_data, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for overview_polyline in result['routes']:
params = {
"request_no": request_no,
"Overview_polyline": overview_polyline['overview_polyline']['points']
}
cursor.execute(add_polyline, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for summary in result['routes']:
params = {
"request_no": request_no,
"summary": summary['summary']
}
cursor.execute(add_summary, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for warnings in result['routes']:
params = {
"request_no": request_no,
"warnings": str(warnings['warnings'])
}
cursor.execute(add_warnings, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for waypoint_order in result['routes']:
params = {
"request_no": request_no,
"waypoint_order": str(waypoint_order['waypoint_order'])
}
cursor.execute(add_waypoint_order, params)
query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
O_l = leg['start_location']['lat']
O_ln = leg['start_location']['lng']
D_l = leg['end_location']['lat']
D_ln = leg['end_location']['lng']
_sent_time_stamp = leg['_sent_time_stamp']
cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
request_no = cursor.fetchone()[0]
for steps in result['routes'][0]['legs'][0]['steps']:
params = {
"request_no": request_no,
"leg_dt": steps['dt']['value'],
"leg_ds": steps['ds']['value'],
"leg_O_l": steps['start_location']['lat'],
"leg_O_ln": steps['start_location']['lng'],
"leg_D_l": steps['end_location']['lat'],
"leg_D_ln": steps['end_location']['lng'],
"leg_html_inst": steps['html_instructions'],
"leg_polyline": steps['polyline']['points'],
"leg_travel_mode": steps['travel_mode']
}
cursor.execute(add_leg_data, params)
cnx.commit()
print('error messages:', error_messages)
cursor.close()
cnx.close()
print('finished' + file)
Using htop on the Linux Instance I can see the following:
Regarding the MySQL database, using MySQL Workbench I can see that:
This python script has been chugging away for days but I have only inserted around 20% of the data to MySQL.
My questions - how can I identify the bottleneck? Is it the Python script? It appears to be using a low amount of memory - can I increase this? I have checked the InnoDB buffer pool size as per (How to improve the speed of InnoDB writes per second of MySQL DB) and found it to be large:
SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
| 11674845184 |
+---------------------------+
Since I am using an RDS and EC2 instance in the same region I don't believe there is a network bottleneck. Pointers on where I should look for the biggest savings would be very welcome!
EDIT
I think I may have stumbled upon the issue. For efficiency during parsing I am writing each level of JSON separately. However, I then have to execute a query to match a nested part of JSON with its higher level. This query has a low overhead when using small databases. Ive noticed that the speed of the inserts has decreased dramatically on this db. This is because it is having to search a larger and ever growing db to properly connect the JSON data.
I am not sure how to solve this other than waiting it out....
I can not see any table definitions in the Python script .... But when we try and do large Data Operations - we would always disable any Database Indexes when loading to MySQL - also if you have any constraints/Foreign Key enforcement - this should be disabled when you are loading also.
Autocommit is disabled by default when connecting through Connector/Python.
But I can not see any commit - options in the code you present
To Summarise
Disable/Remove (For Loading)
-- Indexes
-- Constraints
-- Foreign Keys
-- Triggers
In your Loading Program
-- Disable autocommit -- commit ever n records (N will depend upon your Buffer size available)
my englist is poor
if i do this work, i will
use python convert json to txt
use mysq imp tool , import txt to mysql
if you must do python+mysql allinone ,i suggest use
insert table values(1),value(2)...value(xxx)
why 'SELECT request_no FROM master'multiple occurrence, should be read from json
my englist is very poor.so..
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With