Package pyspark :: Module rdd :: Class RDD
[frames] | no frames]

Class RDD

source code

object --+
         |
        RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

Instance Methods
 
__init__(self, jrdd, ctx)
x.__init__(...) initializes x; see help(type(x)) for signature
source code
 
context(self)
The SparkContext that this RDD was created on.
source code
 
cache(self)
Persist this RDD with the default storage level (MEMORY_ONLY).
source code
 
persist(self, storageLevel)
Set this RDD's storage level to persist its values across operations after the first time it is computed.
source code
 
unpersist(self)
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
source code
 
checkpoint(self)
Mark this RDD for checkpointing.
source code
 
isCheckpointed(self)
Return whether this RDD has been checkpointed or not
source code
 
getCheckpointFile(self)
Gets the name of the file to which this RDD was checkpointed
source code
 
map(self, f, preservesPartitioning=False)
Return a new RDD containing the distinct elements in this RDD.
source code
 
flatMap(self, f, preservesPartitioning=False)
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
source code
 
mapPartitions(self, f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD.
source code
 
mapPartitionsWithSplit(self, f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
source code
 
filter(self, f)
Return a new RDD containing only the elements that satisfy a predicate.
source code
 
distinct(self)
Return a new RDD containing the distinct elements in this RDD.
source code
 
sample(self, withReplacement, fraction, seed)
Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable).
source code
 
takeSample(self, withReplacement, num, seed)
Return a fixed-size sampled subset of this RDD (currently requires numpy).
source code
 
union(self, other)
Return the union of this RDD and another one.
source code
 
__add__(self, other)
Return the union of this RDD and another one.
source code
 
sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x)
Sorts this RDD, which is assumed to consist of (key, value) pairs.
source code
 
glom(self)
Return an RDD created by coalescing all elements within each partition into a list.
source code
 
cartesian(self, other)
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other.
source code
 
groupBy(self, f, numPartitions=None)
Return an RDD of grouped items.
source code
 
pipe(self, command, env={})
Return an RDD created by piping elements to a forked external process.
source code
 
foreach(self, f)
Applies a function to all elements of this RDD.
source code
 
collect(self)
Return a list that contains all of the elements in this RDD.
source code
 
reduce(self, f)
Reduces the elements of this RDD using the specified commutative and associative binary operator.
source code
 
fold(self, zeroValue, op)
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value."
source code
 
sum(self)
Add up the elements in this RDD.
source code
 
count(self)
Return the number of elements in this RDD.
source code
 
stats(self)
Return a StatCounter object that captures the mean, variance and count of the RDD's elements in one operation.
source code
 
mean(self)
Compute the mean of this RDD's elements.
source code
 
variance(self)
Compute the variance of this RDD's elements.
source code
 
stdev(self)
Compute the standard deviation of this RDD's elements.
source code
 
sampleStdev(self)
Compute the sample standard deviation of this RDD's elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
source code
 
sampleVariance(self)
Compute the sample variance of this RDD's elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
source code
 
countByValue(self)
Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
source code
 
take(self, num)
Take the first num elements of the RDD.
source code
 
first(self)
Return the first element in this RDD.
source code
 
saveAsTextFile(self, path)
Save this RDD as a text file, using string representations of elements.
source code
 
collectAsMap(self)
Return the key-value pairs in this RDD to the master as a dictionary.
source code
 
reduceByKey(self, func, numPartitions=None)
Merge the values for each key using an associative reduce function.
source code
 
reduceByKeyLocally(self, func)
Merge the values for each key using an associative reduce function, but return the results immediately to the master as a dictionary.
source code
 
countByKey(self)
Count the number of elements for each key, and return the result to the master as a dictionary.
source code
 
join(self, other, numPartitions=None)
Return an RDD containing all pairs of elements with matching keys in self and other.
source code
 
leftOuterJoin(self, other, numPartitions=None)
Perform a left outer join of self and other.
source code
 
rightOuterJoin(self, other, numPartitions=None)
Perform a right outer join of self and other.
source code
 
partitionBy(self, numPartitions, partitionFunc=hash)
Return a copy of the RDD partitioned using the specified partitioner.
source code
 
combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None)
Generic function to combine the elements for each key using a custom set of aggregation functions.
source code
 
groupByKey(self, numPartitions=None)
Group the values for each key in the RDD into a single sequence.
source code
 
flatMapValues(self, f)
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.
source code
 
mapValues(self, f)
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
source code
 
groupWith(self, other)
Alias for cogroup.
source code
 
cogroup(self, other, numPartitions=None)
For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.
source code
 
subtractByKey(self, other, numPartitions=None)
Return each (key, value) pair in self that has no pair with matching key in other.
source code
 
subtract(self, other, numPartitions=None)
Return each value in self that is not contained in other.
source code
 
keyBy(self, f)
Creates tuples of the elements in this RDD by applying f.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Properties

Inherited from object: __class__

Method Details

__init__(self, jrdd, ctx)
(Constructor)

source code 

x.__init__(...) initializes x; see help(type(x)) for signature

Overrides: object.__init__
(inherited documentation)

context(self)

source code 

The SparkContext that this RDD was created on.

Decorators:
  • @property

persist(self, storageLevel)

source code 

Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.

checkpoint(self)

source code 

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

flatMap(self, f, preservesPartitioning=False)

source code 

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

mapPartitions(self, f, preservesPartitioning=False)

source code 

Return a new RDD by applying a function to each partition of this RDD.

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]

mapPartitionsWithSplit(self, f, preservesPartitioning=False)

source code 

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
>>> rdd.mapPartitionsWithSplit(f).sum()
6

filter(self, f)

source code 

Return a new RDD containing only the elements that satisfy a predicate.

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]

distinct(self)

source code 

Return a new RDD containing the distinct elements in this RDD.

>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]

sample(self, withReplacement, fraction, seed)

source code 

Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable).

>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]

takeSample(self, withReplacement, num, seed)

source code 

Return a fixed-size sampled subset of this RDD (currently requires numpy).

>>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
[4, 2, 1, 8, 2, 7, 0, 4, 1, 4]

union(self, other)

source code 

Return the union of this RDD and another one.

>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]

__add__(self, other)
(Addition operator)

source code 

Return the union of this RDD and another one.

>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]

sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x)

source code 

Sorts this RDD, which is assumed to consist of (key, value) pairs.

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey(True, 2).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
>>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
>>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]

glom(self)

source code 

Return an RDD created by coalescing all elements within each partition into a list.

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> sorted(rdd.glom().collect())
[[1, 2], [3, 4]]

cartesian(self, other)

source code 

Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other.

>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]

groupBy(self, f, numPartitions=None)

source code 

Return an RDD of grouped items.

>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]

pipe(self, command, env={})

source code 

Return an RDD created by piping elements to a forked external process.

>>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
['1', '2', '3']

foreach(self, f)

source code 

Applies a function to all elements of this RDD.

>>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

reduce(self, f)

source code 

Reduces the elements of this RDD using the specified commutative and associative binary operator.

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

fold(self, zeroValue, op)

source code 

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value."

The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15

sum(self)

source code 

Add up the elements in this RDD.

>>> sc.parallelize([1.0, 2.0, 3.0]).sum()
6.0

count(self)

source code 

Return the number of elements in this RDD.

>>> sc.parallelize([2, 3, 4]).count()
3

mean(self)

source code 

Compute the mean of this RDD's elements.

>>> sc.parallelize([1, 2, 3]).mean()
2.0

variance(self)

source code 

Compute the variance of this RDD's elements.

>>> sc.parallelize([1, 2, 3]).variance()
0.666...

stdev(self)

source code 

Compute the standard deviation of this RDD's elements.

>>> sc.parallelize([1, 2, 3]).stdev()
0.816...

sampleStdev(self)

source code 

Compute the sample standard deviation of this RDD's elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).

>>> sc.parallelize([1, 2, 3]).sampleStdev()
1.0

sampleVariance(self)

source code 

Compute the sample variance of this RDD's elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).

>>> sc.parallelize([1, 2, 3]).sampleVariance()
1.0

countByValue(self)

source code 

Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]

take(self, num)

source code 

Take the first num elements of the RDD.

This currently scans the partitions *one by one*, so it will be slow if a lot of partitions are required. In that case, use collect to get the whole RDD instead.

>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]

first(self)

source code 

Return the first element in this RDD.

>>> sc.parallelize([2, 3, 4]).first()
2

saveAsTextFile(self, path)

source code 

Save this RDD as a text file, using string representations of elements.

>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
>>> from fileinput import input
>>> from glob import glob
>>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'

collectAsMap(self)

source code 

Return the key-value pairs in this RDD to the master as a dictionary.

>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
2
>>> m[3]
4

reduceByKey(self, func, numPartitions=None)

source code 

Merge the values for each key using an associative reduce function.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

reduceByKeyLocally(self, func)

source code 

Merge the values for each key using an associative reduce function, but return the results immediately to the master as a dictionary.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKeyLocally(add).items())
[('a', 2), ('b', 1)]

countByKey(self)

source code 

Count the number of elements for each key, and return the result to the master as a dictionary.

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]

join(self, other, numPartitions=None)

source code 

Return an RDD containing all pairs of elements with matching keys in self and other.

Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.

Performs a hash join across the cluster.

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]

leftOuterJoin(self, other, numPartitions=None)

source code 

Perform a left outer join of self and other.

For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in other have key k.

Hash-partitions the resulting RDD into the given number of partitions.

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.leftOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None))]

rightOuterJoin(self, other, numPartitions=None)

source code 

Perform a right outer join of self and other.

For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) if no elements in self have key k.

Hash-partitions the resulting RDD into the given number of partitions.

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(y.rightOuterJoin(x).collect())
[('a', (2, 1)), ('b', (None, 4))]

partitionBy(self, numPartitions, partitionFunc=hash)

source code 

Return a copy of the RDD partitioned using the specified partitioner.

>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect()
>>> set(sets[0]).intersection(set(sets[1]))
set([])

combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None)

source code 

Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C. Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

Users provide three functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C's into a single one.

In addition, users can control the partitioning of the output RDD.

>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def f(x): return x
>>> def add(a, b): return a + str(b)
>>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]

groupByKey(self, numPartitions=None)

source code 

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions.

>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect())
[('a', [1, 1]), ('b', [1])]

cogroup(self, other, numPartitions=None)

source code 

For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))]

subtractByKey(self, other, numPartitions=None)

source code 

Return each (key, value) pair in self that has no pair with matching key in other.

>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]

subtract(self, other, numPartitions=None)

source code 

Return each value in self that is not contained in other.

>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]

keyBy(self, f)

source code 

Creates tuples of the elements in this RDD by applying f.

>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
>>> sorted(x.cogroup(y).collect())
[(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]