Apache Spark RDD: pyspark.SparkContext:

  • sc.parallelize(data, num)
  • sc.textFile(file, num)
  • dir(sc)
from pyspark import SparkContext
sc = SparkContext()
# xrange() is more efficient than range() because it's generator
data = range(1,11)
rdd = sc.parallelize(data,2)
type(sc)
pyspark.context.SparkContext
sc.version
'1.4.0'
type(rdd)
pyspark.rdd.PipelinedRDD
print('The id of rdd is {0}'.format(rdd.id()))
The id of rdd is 1
rdd.setName('My first RDD')
My first RDD PythonRDD[1]at RDD at PythonRDD.scala:43
print(rdd.toDebugString())
b'(2) My first RDD PythonRDD[1]at RDD at PythonRDD.scala:43 []\n |  ParallelCollectionRDD[0]at parallelize at PythonRDD.scala:396 []'

Transformations:

  • map(func()) : applies function to every element of context
  • filter(func()) : applies function to every element of context and returns those that evaluate to True
  • distinct() : returns list of unique values
  • flatMap(func()) : applies function to every element of context and returns flattened list (i.e. function returns sequence rather than single item)
  • zipWithIndex() : returns RDD where each element is zipped into tuple with its index
  • sample()
  • randomSplit(weights, seed): retunrs train, validate, test RDDs

Pair RDD Transformations:

Returns (k, iterable v) pairs:

  • groupByKey().mapValues(lambda x: list(x))
  • groupByKey().map(lambda (k, v): (k, sum(v)))
  • groupByKey().mapValues(lambda x: sum(x))

Retruns (k, v) pairs:

  • reduceByKey(lambda a,b: a+b)

    reduce by key aggregates values for unique key by argument function which takes two arguments and returns one

  • from operator import addreduceByKey(add)
  • sortByKey(): sorts pair RDDs in ascending order
  • sortBy(): sorts pair RDD in acsending order:

rdd.sortBy(lambda x: x[1]).collect() : sorts in ascending order by value

rdd.map(lambda a: 2*a).collect()
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd.filter(lambda x: x>5).collect()
[6, 7, 8, 9, 10]

Actions:

  • reduce

    Takes two arguments, returns one

    the function should be:

    • communitive
    • associative
  • collect()
  • collectAsMap() : returns dictionary out of key-value pairs (tuples)
  • count() : returns the # of elements (rows) in this RDD
  • takeSample()
  • take(n): first n elements of RDD
  • first()
  • top(n): top n in descending order
  • takeOrdered(n, key): top n is ascending order
  • mean(): returns mean of the rdd
rdd.sample(False, .2).collect()
[7]
rdd.takeSample(True,10)
[8, 3, 2, 8, 9, 1, 4, 6, 1, 5]
rdd.filter(lambda x: x>5).count()
5
rdd.take(1)
[1]
rdd.first()
1
rdd.top(3)
[10, 9, 8]
rdd.takeOrdered(3)
[1, 2, 3]

reduceByKey and groupByKey

from operator import add
rdd2 = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
rdd2.groupByKey().mapValues(lambda x: list(x)).collect()
[('a', [1, 2]), ('b', [1])]
rdd2.groupByKey().map(lambda (k,v): (k, sum(v))).collect()
[('a', 3), ('b', 1)]
rdd2.groupByKey().mapValues(lambda x: sum(x)).collect()
[('a', 3), ('b', 1)]
rdd3 = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
rdd3.reduceByKey(add).collect()
[('a', 3), ('b', 1)]
rdd4 = sc.parallelize([(3, 1), (3, 2), (6, 1)])
rdd4.reduceByKey(add).collect()
[(6, 1), (3, 3)]
rdd4.reduceByKey(lambda a,b: [a,b]).collect()
[(6, 1), (3, [1, 2])]
Write a comment:

*

Your email address will not be published.

© 2014 In R we trust.
Top
Follow us: