from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
Reading csv with customSchema
from pyspark.sql.types import *
customSchema = StructType([
StructField("id", IntegerType(), True),
StructField("bikes_available", IntegerType(), True),
StructField("docks_available", IntegerType(), True),
StructField("time", TimestampType(), True)
])
path = "file:///home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv"
df = sqlContext.read.csv(path, header=True, schema=customSchema)
df.show(5)
+---+---------------+---------------+--------------------+ | id|bikes_available|docks_available| time| +---+---------------+---------------+--------------------+ | 2| 12| 15|2014-03-01 00:00:...| | 2| 12| 15|2014-03-01 00:01:...| | 2| 12| 15|2014-03-01 00:02:...| | 2| 12| 15|2014-03-01 00:03:...| | 2| 12| 15|2014-03-01 00:04:...| +---+---------------+---------------+--------------------+ only showing top 5 rows
df.printSchema()
root |-- id: integer (nullable = true) |-- bikes_available: integer (nullable = true) |-- docks_available: integer (nullable = true) |-- time: timestamp (nullable = true)
Type inference by Reflection
from pyspark.sql import Row
from datetime import datetime as dt
rdd = sc.textFile(path).zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
rowRDD = (rdd
.map(lambda x: x.split(","))
.map(lambda x: Row(id_ = int(x[0][1:-1])
,bikes_available = int(x[1][1:-1])
,docks_available = int(x[2][1:-1])
,time = dt.strptime(x[3][1:-1], "%Y-%m-%d %H:%M:%S")
)
)
)
df2 = rowRDD.toDF()
df2.show(5)
+---------------+---------------+---+--------------------+ |bikes_available|docks_available|id_| time| +---------------+---------------+---+--------------------+ | 12| 15| 2|2014-03-01 00:00:...| | 12| 15| 2|2014-03-01 00:01:...| | 12| 15| 2|2014-03-01 00:02:...| | 12| 15| 2|2014-03-01 00:03:...| | 12| 15| 2|2014-03-01 00:04:...| +---------------+---------------+---+--------------------+ only showing top 5 rows
df2.printSchema()
root |-- bikes_available: long (nullable = true) |-- docks_available: long (nullable = true) |-- id_: long (nullable = true) |-- time: timestamp (nullable = true)
Write a comment: