Посмотрим содержание и структуру файла, который будем конвертировать в формат parquet:

 

! head -n5 /home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv
"station_id","bikes_available","docks_available","time"
"2","12","15","2014-03-01 00:00:02"
"2","12","15","2014-03-01 00:01:03"
"2","12","15","2014-03-01 00:02:03"
"2","12","15","2014-03-01 00:03:02"

 

Для конвертации нам понадобится определение типов полей («схема»):

 

from pyspark.sql.types import *
customSchema = StructType([
        StructField("station_id", IntegerType(),True),
        StructField("bikes_available", IntegerType(),True),
        StructField("docks_available", IntegerType(),True),
        StructField("time", TimestampType(),True),
    ])

 

Прочитаем csv файл с указанием схемы:

 

readPath = "file:///home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv"
df = spark.read.csv(readPath, header=True, schema=customSchema)
df.printSchema()
root
 |-- station_id: integer (nullable = true)
 |-- bikes_available: integer (nullable = true)
 |-- docks_available: integer (nullable = true)
 |-- time: timestamp (nullable = true)

 

Теперь мы можем записать в Hadoop сначала csv файл в простом текстовом формате:

 

txtPath = "hdfs:///user/sergey/txt"
df.write.csv(txtPath)

 

… затем тот же самый файл в формате parquet:

 

parquetPath = "hdfs:///user/sergey/parquet"
df.write.parquet(parquetPath)

 

Объем исходного файла в локальной файловой систему Linux:

 

! du -h /home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv
623M	/home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv

 

Объем записанных файлов в Hadoop:

 

! hdfs dfs -du -h
119.2 M  parquet
430.4 M  txt

 

! hadoop jar ~/tools/parquet-tools.jar head -n3 /user/sergey/parquet
16/09/10 22:35:43 INFO compress.CodecPool: Got brand-new decompressor [.snappy]station_id = 2
bikes_available = 12
docks_available = 15
time = ABSiRXxBAACNfCUA

station_id = 2
bikes_available = 12
docks_available = 15
time = ADaEeYpBAACNfCUA

station_id = 2
bikes_available = 12
docks_available = 15
time = AI7LcZhBAACNfCUA

 

df2 = spark.read.parquet("hdfs:///user/sergey/parquet")
df2.show(5)
+----------+---------------+---------------+--------------------+
|station_id|bikes_available|docks_available|                time|
+----------+---------------+---------------+--------------------+
|         2|             12|             15|2014-03-01 00:00:...|
|         2|             12|             15|2014-03-01 00:01:...|
|         2|             12|             15|2014-03-01 00:02:...|
|         2|             12|             15|2014-03-01 00:03:...|
|         2|             12|             15|2014-03-01 00:04:...|
+----------+---------------+---------------+--------------------+
only showing top 5 rows

 

Созданный parquet файл может быть загружен в Hive при помощи следующих эквивалентных выражений:

CREATE EXTERNAL TABLE IF NOT EXISTS bikes_parquet777 (
id_station BIGINT,
bikes_available BIGINT,
docks_available BIGINT,
time TIMESTAMP)
STORED AS PARQUET
LOCATION '/user/sergey/parquet';

create external table bikes_parquet_777 (
station_id smallint,
bikes_available smallint,
docks_available smallint,
time timestamp)
stored as parquet
location "hdfs:///user/sergey/parquet/";

CREATE EXTERNAL TABLE IF NOT EXISTS bikes_parquet77 (
station_id BIGINT,
bikes_available BIGINT,
docks_available BIGINT,
time TIMESTAMP)
row format delimited
fields terminated by ','
STORED AS PARQUET
LOCATION '/user/sergey/parquet';

 

Write a comment:

*

Your email address will not be published.

© 2014 In R we trust.
Top
Follow us: