Посмотрим содержание и структуру файла, который будем конвертировать в формат parquet
:
! head -n5 /home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv
"station_id","bikes_available","docks_available","time" "2","12","15","2014-03-01 00:00:02" "2","12","15","2014-03-01 00:01:03" "2","12","15","2014-03-01 00:02:03" "2","12","15","2014-03-01 00:03:02"
Для конвертации нам понадобится определение типов полей («схема»):
from pyspark.sql.types import *
customSchema = StructType([
StructField("station_id", IntegerType(),True),
StructField("bikes_available", IntegerType(),True),
StructField("docks_available", IntegerType(),True),
StructField("time", TimestampType(),True),
])
Прочитаем csv
файл с указанием схемы:
readPath = "file:///home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv"
df = spark.read.csv(readPath, header=True, schema=customSchema)
df.printSchema()
root |-- station_id: integer (nullable = true) |-- bikes_available: integer (nullable = true) |-- docks_available: integer (nullable = true) |-- time: timestamp (nullable = true)
Теперь мы можем записать в Hadoop сначала csv
файл в простом текстовом формате:
txtPath = "hdfs:///user/sergey/txt"
df.write.csv(txtPath)
… затем тот же самый файл в формате parquet
:
parquetPath = "hdfs:///user/sergey/parquet"
df.write.parquet(parquetPath)
Объем исходного файла в локальной файловой систему Linux:
! du -h /home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv
623M /home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv
Объем записанных файлов в Hadoop:
! hdfs dfs -du -h
119.2 M parquet 430.4 M txt
! hadoop jar ~/tools/parquet-tools.jar head -n3 /user/sergey/parquet
16/09/10 22:35:43 INFO compress.CodecPool: Got brand-new decompressor [.snappy]station_id = 2 bikes_available = 12 docks_available = 15 time = ABSiRXxBAACNfCUA station_id = 2 bikes_available = 12 docks_available = 15 time = ADaEeYpBAACNfCUA station_id = 2 bikes_available = 12 docks_available = 15 time = AI7LcZhBAACNfCUA
df2 = spark.read.parquet("hdfs:///user/sergey/parquet")
df2.show(5)
+----------+---------------+---------------+--------------------+ |station_id|bikes_available|docks_available| time| +----------+---------------+---------------+--------------------+ | 2| 12| 15|2014-03-01 00:00:...| | 2| 12| 15|2014-03-01 00:01:...| | 2| 12| 15|2014-03-01 00:02:...| | 2| 12| 15|2014-03-01 00:03:...| | 2| 12| 15|2014-03-01 00:04:...| +----------+---------------+---------------+--------------------+ only showing top 5 rows
Созданный parquet
файл может быть загружен в Hive
при помощи следующих эквивалентных выражений:
CREATE EXTERNAL TABLE IF NOT EXISTS bikes_parquet777 (
id_station BIGINT,
bikes_available BIGINT,
docks_available BIGINT,
time TIMESTAMP)
STORED AS PARQUET
LOCATION '/user/sergey/parquet';
create external table bikes_parquet_777 (
station_id smallint,
bikes_available smallint,
docks_available smallint,
time timestamp)
stored as parquet
location "hdfs:///user/sergey/parquet/";
CREATE EXTERNAL TABLE IF NOT EXISTS bikes_parquet77 (
station_id BIGINT,
bikes_available BIGINT,
docks_available BIGINT,
time TIMESTAMP)
row format delimited
fields terminated by ','
STORED AS PARQUET
LOCATION '/user/sergey/parquet';
Write a comment: