sc.version
'2.0.0'
from pyspark import SparkContext, HiveContext
context = HiveContext(sc)
path = "file:///home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/data/201408_status_data.csv"
df = context.read.csv(path, header=True).cache()
df.printSchema()
root
 |-- station_id: string (nullable = true)
 |-- bikes_available: string (nullable = true)
 |-- docks_available: string (nullable = true)
 |-- time: string (nullable = true)

df.count()
18342210
df.show(5)
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|         2|             12|             15|2014-03-01 00:00:02|
|         2|             12|             15|2014-03-01 00:01:03|
|         2|             12|             15|2014-03-01 00:02:03|
|         2|             12|             15|2014-03-01 00:03:02|
|         2|             12|             15|2014-03-01 00:04:03|
+----------+---------------+---------------+-------------------+
only showing top 5 rows

df.printSchema()
root
 |-- station_id: string (nullable = true)
 |-- bikes_available: string (nullable = true)
 |-- docks_available: string (nullable = true)
 |-- time: string (nullable = true)

df.selectExpr(('2*bikes_available')).show(5)
+---------------------+
|(2 * bikes_available)|
+---------------------+
|                 24.0|
|                 24.0|
|                 24.0|
|                 24.0|
|                 24.0|
+---------------------+
only showing top 5 rows

This does not work!!! :

# df.select(('2*bikes_available')).show(5)
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
bikes_double = udf(lambda x: 2*int(x), IntegerType())
df.select(bikes_double('bikes_available').alias('2 x bikes_available')).show(5)
+-------------------+
|2 x bikes_available|
+-------------------+
|                 24|
|                 24|
|                 24|
|                 24|
|                 24|
+-------------------+
only showing top 5 rows

df.select(2*df.bikes_available).show(5)
+---------------------+
|(bikes_available * 2)|
+---------------------+
|                 24.0|
|                 24.0|
|                 24.0|
|                 24.0|
|                 24.0|
+---------------------+
only showing top 5 rows

Hive Data Definition Language (DDL)

df.registerTempTable('tb1')
context.sql('show databases').show()
+------------+
|databaseName|
+------------+
|     default|
+------------+

context.sql('DESCRIBE DATABASE default').show(truncate=False)
+-------------------------+-------------------------------------------------------------------------+
|database_description_item|database_description_value                                               |
+-------------------------+-------------------------------------------------------------------------+
|Database Name            |default                                                                  |
|Description              |Default Hive database                                                    |
|Location                 |file:/home/sergey/Py_SparkDataFrame_edx_CS105_CS110_CS120/spark-warehouse|
+-------------------------+-------------------------------------------------------------------------+

context.sql('USE default').show()
++
||
++
++

context.sql('show tables').show()
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|      tb1|       true|
+---------+-----------+

df.printSchema()
root
 |-- station_id: string (nullable = true)
 |-- bikes_available: string (nullable = true)
 |-- docks_available: string (nullable = true)
 |-- time: string (nullable = true)

context.sql('desc tb1').show()
+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|     station_id|   string|       |
|bikes_available|   string|       |
|docks_available|   string|       |
|           time|   string|       |
+---------------+---------+-------+

Data SELECT and JOIN

The SELECT statement is quite often used with FROM , DISTINCT , WHERE , and LIMIT
keywords.

context.sql('select * from tb1 where bikes_available >= 10').show(5)
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|         2|             12|             15|2014-03-01 00:00:02|
|         2|             12|             15|2014-03-01 00:01:03|
|         2|             12|             15|2014-03-01 00:02:03|
|         2|             12|             15|2014-03-01 00:03:02|
|         2|             12|             15|2014-03-01 00:04:03|
+----------+---------------+---------------+-------------------+
only showing top 5 rows

context.sql('select count(distinct station_id) number_of_dockstations from tb1').show()
+----------------------+
|number_of_dockstations|
+----------------------+
|                    70|
+----------------------+

context.sql('select distinct station_id from tb1 order by cast(station_id as int) limit 5').show()
+----------+
|station_id|
+----------+
|         2|
|         3|
|         4|
|         5|
|         6|
+----------+

context.sql('select max(distinct cast(station_id as int)) from tb1').show()
+----------------------------+
|max(CAST(station_id AS INT))|
+----------------------------+
|                          84|
+----------------------------+

Nested SELECT

context.sql(
    '''
    WITH tb2 AS
    (SELECT * FROM tb1 WHERE station_id >=80)
    SELECT * from tb2 LIMIT 5
    '''
).show()
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|        80|              8|              7|2014-03-01 00:00:02|
|        80|              8|              7|2014-03-01 00:01:03|
|        80|              8|              7|2014-03-01 00:02:03|
|        80|              8|              7|2014-03-01 00:03:02|
|        80|              8|              7|2014-03-01 00:04:03|
+----------+---------------+---------------+-------------------+

context.sql('SELECT * FROM (SELECT * FROM tb1 WHERE station_id >= 80) tb2').show(5)
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|        80|              8|              7|2014-03-01 00:00:02|
|        80|              8|              7|2014-03-01 00:01:03|
|        80|              8|              7|2014-03-01 00:02:03|
|        80|              8|              7|2014-03-01 00:03:02|
|        80|              8|              7|2014-03-01 00:04:03|
+----------+---------------+---------------+-------------------+
only showing top 5 rows

context.sql('''
select max(day(TIME)) maxDayOfMonth 
from tb1'''
           ).show()
+-------------+
|maxDayOfMonth|
+-------------+
|           31|
+-------------+

context.sql('''
select max(from_unixtime(unix_timestamp(TIME,"yyyy-MM-dd"),"u")) maxDayOfWeek
from tb1
'''
           ).show()
+------------+
|maxDayOfWeek|
+------------+
|           7|
+------------+

context.sql('''
select distinct from_unixtime(unix_timestamp(TIME,"yyyy-MM-dd"),"u") DoW 
from tb1'''
           ).show()
+---+
|DoW|
+---+
|  7|
|  3|
|  5|
|  6|
|  1|
|  4|
|  2|
+---+

context.sql('''
select collect_set(from_unixtime(unix_timestamp(TIME,"yyyy-MM-dd"),"u")) DoW
from tb1'''
           ).show(truncate=False)
+---------------------+
|DoW                  |
+---------------------+
|[3, 1, 2, 5, 4, 7, 6]|
+---------------------+

Aggregation and sampling

MIN , MAX , AVG , GROUPING SETS, ROLLUP , CUBE

Sample data 1:

context.sql('SELECT * FROM tb1 DISTRIBUTE BY RAND() SORT BY RAND() LIMIT 3').show()
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|        55|              4|             19|2014-04-28 08:50:03|
|        42|             11|              4|2014-06-19 04:44:02|
|        36|              7|              8|2014-03-15 11:23:02|
+----------+---------------+---------------+-------------------+

Sample data 2:

df3 = context.sql('SELECT * FROM tb1 TABLESAMPLE(.01 PERCENT)')
df3.count()
1865
df3.show(5)
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|         2|             14|             12|2014-03-25 00:27:02|
|         2|             11|             16|2014-04-02 02:55:02|
|         2|              8|             19|2014-04-10 02:24:03|
|         2|              9|             18|2014-04-18 06:52:03|
|         2|             12|             15|2014-04-20 12:36:02|
+----------+---------------+---------------+-------------------+
only showing top 5 rows

context.sql("show tables").show()
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|      tb1|       true|
+---------+-----------+

context.sql('''SELECT * 
               FROM tb1 
               SORT BY RAND() 
               LIMIT 3''').show()
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|         3|              6|              9|2014-06-08 19:43:03|
|        13|              6|              9|2014-05-21 19:25:02|
|        12|             10|              9|2014-07-28 11:32:03|
+----------+---------------+---------------+-------------------+

context.sql('''
               SELECT docks_available - bikes_available AS free_slots
               FROM tb1
               LIMIT 3
            ''').show()
+----------+
|free_slots|
+----------+
|       3.0|
|       3.0|
|       3.0|
+----------+

  • SORT BY in most cases preferrable to ORDER BY because its faster (due to use of many reducers in SORT BY)
  • Always avoid using ORDER BY in queries.
context.sql('''SELECT from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u") DoW,
                   AVG(docks_available - bikes_available) avearge_free_slots
               FROM tb1
               GROUP BY from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u")
               ORDER BY from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u") ASC
            ''').show()
+---+------------------+
|DoW|avearge_free_slots|
+---+------------------+
|  1|0.8415827554684774|
|  2|0.9188464290146106|
|  3|0.8579307122917329|
|  4|0.8169979154583408|
|  5|0.7673175111718432|
|  6|0.6536297993408389|
|  7|0.6719503599533448|
+---+------------------+

context.sql('''SELECT from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"E") DoW,
                   AVG(docks_available - bikes_available) avearge_free_slots
               FROM tb1
               GROUP BY from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"E")
            ''').show()
+---+------------------+
|DoW|avearge_free_slots|
+---+------------------+
|Sun|0.6719503599533448|
|Mon|0.8415827554684774|
|Thu|0.8169979154583408|
|Sat|0.6536297993408389|
|Wed|0.8579307122917329|
|Tue|0.9188464290146106|
|Fri|0.7673175111718432|
+---+------------------+

context.sql('''SELECT from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"E") as day,
                      count(*) as `count`
               FROM tb1
               GROUP BY from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"E")''').show()
+---+-------+
|day|  count|
+---+-------+
|Sun|2634647|
|Mon|2611193|
|Thu|2612085|
|Sat|2662475|
|Wed|2611430|
|Tue|2610988|
|Fri|2599392|
+---+-------+

Selecting columns that are not part of GROUP BY statement with:

colect_set(colName)

context.sql('''SELECT from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u") dayID,
                      collect_set(from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"E")) DoW,
                      AVG(docks_available - bikes_available) avearge_free_slots
               FROM tb1
               GROUP BY from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u")
               ORDER BY from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u") ASC
               ''').show()
+-----+-----+------------------+
|dayID|  DoW|avearge_free_slots|
+-----+-----+------------------+
|    1|[Mon]|0.8415827554684774|
|    2|[Tue]|0.9188464290146106|
|    3|[Wed]|0.8579307122917329|
|    4|[Thu]|0.8169979154583408|
|    5|[Fri]|0.7673175111718432|
|    6|[Sat]|0.6536297993408389|
|    7|[Sun]|0.6719503599533448|
+-----+-----+------------------+

context.sql('''
                with tb2 as
                (select * from tb1 where from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"u")=1)
                select collect_set(from_unixtime(unix_timestamp(time,"yyyy-MM-dd"),"E")) DoW,
                AVG(docks_available - bikes_available) avearge_free_slots from tb2
            ''').show()
+-----+------------------+
|  DoW|avearge_free_slots|
+-----+------------------+
|[Mon]|0.8415827554684774|
+-----+------------------+

Analytic functions

Used together with OVER , PARTITION BY , ORDER BY

Though analytic functions give aggregate results, they
do not group the result set. They return the group value multiple times with each
record.

Function (arg1,..., argn) OVER ([PARTITION BY <...>][ORDER BY <....>][<window_clause>])

context.sql("select last_value(time) from tb1").show()
+-------------------+
|         last(time)|
+-------------------+
|2014-08-31 23:59:03|
+-------------------+

context.sql("select max(time) from tb1").show()
+-------------------+
|          max(time)|
+-------------------+
|2014-08-31 23:59:03|
+-------------------+

Convert table to DF

df2 = context.table('tb1')
type(df2)
pyspark.sql.dataframe.DataFrame
df2.show(5)
+----------+---------------+---------------+-------------------+
|station_id|bikes_available|docks_available|               time|
+----------+---------------+---------------+-------------------+
|         2|             12|             15|2014-03-01 00:00:02|
|         2|             12|             15|2014-03-01 00:01:03|
|         2|             12|             15|2014-03-01 00:02:03|
|         2|             12|             15|2014-03-01 00:03:02|
|         2|             12|             15|2014-03-01 00:04:03|
+----------+---------------+---------------+-------------------+
only showing top 5 rows

Write a comment:

*

Your email address will not be published.

© 2014 In R we trust.
Top
Follow us: