from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Reading csv with customSchema

from pyspark.sql.types import *

customSchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("bikes_available", IntegerType(), True),
    StructField("docks_available", IntegerType(), True),
    StructField("time", TimestampType(), True)
])

path = "file:///home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv"
df = sqlContext.read.csv(path, header=True, schema=customSchema)
df.show(5)
+---+---------------+---------------+--------------------+
| id|bikes_available|docks_available|                time|
+---+---------------+---------------+--------------------+
|  2|             12|             15|2014-03-01 00:00:...|
|  2|             12|             15|2014-03-01 00:01:...|
|  2|             12|             15|2014-03-01 00:02:...|
|  2|             12|             15|2014-03-01 00:03:...|
|  2|             12|             15|2014-03-01 00:04:...|
+---+---------------+---------------+--------------------+
only showing top 5 rows

df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- bikes_available: integer (nullable = true)
 |-- docks_available: integer (nullable = true)
 |-- time: timestamp (nullable = true)

Type inference by Reflection

from pyspark.sql import Row
from datetime import datetime as dt
rdd = sc.textFile(path).zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])

rowRDD = (rdd
          .map(lambda x: x.split(","))
          .map(lambda x: Row(id_ = int(x[0][1:-1])
                             ,bikes_available = int(x[1][1:-1])
                             ,docks_available = int(x[2][1:-1])
                             ,time = dt.strptime(x[3][1:-1], "%Y-%m-%d %H:%M:%S")
                            )
              )
         )

df2 = rowRDD.toDF()
df2.show(5)
+---------------+---------------+---+--------------------+
|bikes_available|docks_available|id_|                time|
+---------------+---------------+---+--------------------+
|             12|             15|  2|2014-03-01 00:00:...|
|             12|             15|  2|2014-03-01 00:01:...|
|             12|             15|  2|2014-03-01 00:02:...|
|             12|             15|  2|2014-03-01 00:03:...|
|             12|             15|  2|2014-03-01 00:04:...|
+---------------+---------------+---+--------------------+
only showing top 5 rows

df2.printSchema()
root
 |-- bikes_available: long (nullable = true)
 |-- docks_available: long (nullable = true)
 |-- id_: long (nullable = true)
 |-- time: timestamp (nullable = true)

Write a comment:

*

Your email address will not be published.

© 2014 In R we trust.
Top
Follow us: