Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hive doesn't read partitioned parquet files generated by Spark

I'm having a problem to read partitioned parquet files generated by Spark in Hive. I'm able to create the external table in hive but when I try to select a few lines, hive returns only an "OK" message with no rows.

I'm able to read the partitioned parquet files correctly in Spark, so I'm assuming that they were generated correctly. I'm also able to read these files when I create an external table in hive without partitioning.

Does anyone have a suggestion?

My Environment is:

  • Cluster EMR 4.1.0
  • Hive 1.0.0
  • Spark 1.5.0
  • Hue 3.7.1
  • Parquet files are stored in a S3 bucket (s3://staging-dev/test/ttfourfieldspart2/year=2013/month=11)

My Spark config file has the following parameters(/etc/spark/conf.dist/spark-defaults.conf):

spark.master yarn
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///var/log/spark/apps
spark.history.fs.logDirectory hdfs:///var/log/spark/apps
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
spark.history.ui.port 18080
spark.shuffle.service.enabled true
spark.driver.extraJavaOptions    -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.extraJavaOptions  -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
spark.executor.memory 4G
spark.driver.memory 4G
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.minExecutors 1

Hive config file has the following parameters(/etc/hive/conf/hive-site.xml):

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->


<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-10-xx-xxx-xxx.ec2.internal</value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value>
  </property>

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>1R72JFCDG5XaaDTB</value>
  <description>password to use against metastore database</description>
</property>

  <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
  </property>

  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>

  <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
  </property>

  <property>
    <name>hive.metastore.connect.retries</name>
    <value>5</value>
  </property>

  <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>true</value>
  </property>

  <property><name>hive.exec.dynamic.partition</name><value>true</value></property>
  <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property>
  <property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property>
  <property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property>

</configuration>

My python code that reads the partitioned parquet file:

from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')

The parquet file schema printed by Spark:

>>> df7.schema
StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

>>> df7.printSchema()
root
 |-- transactionid: string (nullable = true)
 |-- eventts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

>>> df7.show(10)
+--------------------+--------------------+----+-----+
|       transactionid|             eventts|year|month|
+--------------------+--------------------+----+-----+
|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013|   11|
|f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013|   11|
|02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013|   11|
|0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013|   11|
|f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013|   11|
|3425e4ea-b715-476...|2013-11-21 18:41:...|2013|   11|
|a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013|   11|
|d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013|   11|
|46f2eda5-408e-44e...|2013-11-21 18:41:...|2013|   11|
|36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013|   11|
+--------------------+--------------------+----+-----+
only showing top 10 rows

The create table in Hive:

create external table if not exists t3(
  transactionid string,
  eventts timestamp)
partitioned by (year int, month int)
stored as parquet
location 's3://staging-dev/test/ttfourfieldspart2/';

When I try to select some rows in Hive, it doesn't return any rows:

hive> select * from t3 limit 10;
OK
Time taken: 0.027 seconds
hive> 
like image 307
ALunz Avatar asked Nov 05 '15 18:11

ALunz


People also ask

Is there any relationship between Hive and spark partitioning?

Note right away that spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence.

Can Parquet files be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.


1 Answers

Even though this Question was answered already, the following point may also help the users who are still not able to solve the issue just by MSCK REPAIR TABLE table_name;

I have an hdfs file system which is partitioned as below:

<parquet_file>/<partition1>/<partition2>

eg: my_file.pq/column_5=test/column_6=5

I created a hive table with partitions

eg:

CREATE EXTERNAL TABLE myschema.my_table(
`column_1` int,
`column_2` string,
`column_3` string,
`column_4` string
)
PARTITIONED BY (`column_5` string, `column_6` int) STORED AS PARQUET
LOCATION
  'hdfs://u/users/iamr/my_file.pq'

After this, I repaired the schema partitions using the following command

MSCK REPAIR TABLE myschema.my_table;

After this it was started working for me.

Another thing I noticed was that, while writing PARQUET files from spark, name the columns with lower case, otherwise hive may not able to map it. For me after renaming the columns in PARQUET file, it started working

for eg: my_file.pq/COLUMN_5=test/COLUMN_6=5 didn't worked for me

but my_file.pq/column_5=test/column_6=5 worked

like image 170
dasrohith Avatar answered Oct 05 '22 01:10

dasrohith