# spark-RDD

## RDD基础

Python 中使用 textFile() 创建一个字符串的 RDD

```python
from pyspark import SparkConf, SparkContext

# Python 中初始化 Spark
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sc.stop()
sc = SparkContext(conf=conf)

# RDD基础
# RDD 支持两种类型的操作:转化操作(transformation)和行动操作
# 转化操作会由一个 RDD 生成一个新的 RDD
lines = sc.textFile("/Users/jiang/Documents/spark/study/README.md")
```

### RDD 支持两种类型的操作

* 转化操作(transformation)：由一个 RDD 生成一个新的 RDD

> pythonLines = lines.filter(lambda line: "Python" in line)
>
> pythonLines = lines.filter(lambda line: "ctrl" in line)

* 行动操作(action)

> 行动操作会对 RDD 计算出一个结果，并把结果返回到驱动器程序中
>
> pythonLines.first()
>
> lines.count()

注意：

> 可以在任何时候定 义新的 RDD，但 Spark 只会惰性计算这些 RDD
>
> 它们只有第一次在一个行动操作中用到时才会真正计算

## 创建RDD

```python
lines_rdd = sc.textFile("/Users/jiang/Documents/spark/study/README.md")
str_rdd = sc.parallelize(["pandas", "i like pandas"])
```

### 转换操作

`collect()` 函数，可以用来获取整 个 RDD 中的数据

> 数据集能在单台机器的内存中放得下时，才能使用 collect() 在大多数情况下，RDD 不能通过 collect() 收集到驱动器进程中，因为它们一般都很大

filter

```python
ctrl_Lines = lines_rdd.filter(lambda line: "ctrl" in line)
print(ctrl_Lines.first())
print(ctrl_Lines.count())
print(lines_rdd.count())
```

map

```python
nums = sc.parallelize([1, 2, 3, 4]) #原RDD
squared_RDD = nums.map(lambda x: x * x) #转换为平方的RDD
squared=squared_RDD.take(3) #取3个元素
for num in squared:
    print("%i " % (num))

'''
1 
4 
9 
'''
```

`flatMap`

```
我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()
flatMap()函数被分别应用到了输入 RDD 的每个元素上
返回的不是一个元素，而是一个返回值序列的迭代器，迭代器可访问的所有元素的 RDD
flatMap() 的一个简单用途是把输入的字符串切分为单词
```

```python
lines = sc.parallelize(["hello world", "hi lili this is a apple"])
words = lines.flatMap(lambda line: line.split(" "))
words.collect()
'''
['hello', 'world', 'hi', 'lili', 'this', 'is', 'a', 'apple']
'''
```

`distinct`

> RDD.distinct() 转化操作来生成一个只包含不同元素的新 RDD。 不过需要注意，distinct() 操作的开销很大，因为它需要将所有数据通过网络进行 混洗(shuffle)

`intersection(other)`

> **返回两个 RDD 中都有的元素**。 intersection() 在运行时也会**去掉所有重复的元素(单个 RDD 内的重复元素也会一起移除)**。 尽管 intersection() 与 union() 的概念相似，intersection() 的性能却要差很多，因为它需要 通过网络混洗数据来发现共有的元素。

`subtract(other)`

> 返回 一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。 和intersection() 一样，它也需要数据混洗。

`cartesian(other)`

> 转化操作会返回 所有可能的 (a, b) 对，其中 a 是源 RDD 中的元素，而 b 则来自另一个 RDD。 求大规模 RDD 的笛卡儿积开销巨大。

toLocalIterator迭代元素

```python
for l in ctrl_Lines.toLocalIterator():
    print(l,type(l))
```

take，取最多10个

```python
for i,line in enumerate(lines_rdd.take(10)):
    print(i,line)
```

union()合并元素

```
# C_RDD = A_RDD.union(B_RDD)
```

persist\unpersist

> 用 persist() 来把数据的一部分读取到内存中，并反复查询这部分数据
>
> RDD 还有一个方法叫作 unpersist()，调用该方法可以手动把持久化的 RDD 从缓 存中移除

```
str_rdd.persist()
str_rdd.unpersist()
```

### 行动操作

reduce

> 迭代元素，进行某种操作

```python
nums = sc.parallelize([1, 2, 3, 4])
sum_nums = nums.reduce(lambda x, y: x + y)
print(sum_nums)
#10
```

```python
print(nums.top(4))
print(nums.take(4))
print(nums.countByValue())
'''
[4, 3, 2, 1]
[1, 2, 3, 4]
defaultdict(<class 'int'>, {1: 1, 2: 1, 3: 1, 4: 1})
'''
```

```python
'''
(((1+1)+2)+3)+4=11
1+11=12
'''
sum_nums2 = nums.fold(1,lambda x, y: x + y)
print(sum_nums2)

#12
```

foreach遍历

```python
# 这里print无法打印，可以写入文件查看计算过程
def printf(x):
    print(x)
nums.foreach(printf)
```

aggregate返回复杂类型

比如求平均，返回和和个数

```python
'''
输入：
[1, 2, 3, 4]

初始acc:(0,0)

f迭代过程输出文件：
(0, 0),1->(1, 1)
(1, 1),2->(3, 2)
(3, 2),3->(6, 3)
(6, 3),4->(10, 4)

f2迭代过程输出文件：
(0, 0),(10, 4)->(10, 4)

输出:
(10,4)
'''
# 写文件为了显示过程，打印不显示
def f(acc,value):
    with open('f.txt','a+') as of:
        of.write(str(acc)+','+str(value)+'->'+str((acc[0]+value,acc[1]+1))+'\n')
    return (acc[0]+value,acc[1]+1)

def f2(acc1,acc2):
    with open('f2.txt','a+') as of:
        of.write(str(acc1)+','+str(acc2)+'->'+str((acc1[0]+acc2[0],acc1[1]+acc2[1]))+'\n')
    return (acc1[0]+acc2[0],acc1[1]+acc2[1])

sumCount = nums.aggregate((0,0),
                          (lambda acc,value:f(acc,value)),
                          (lambda acc1,acc2:f2(acc1,acc2)))
print(sumCount)
```

## 向Spark传递函数

Spark 的大部分转化操作和一部分行动操作，都需要依赖用户传递的函数来计算。

在三种主要语言中，向 Spark 传递函数的方式略有区别。

> 传递函数时需要小心的一点是，Python 会在你不经意间把函数所在的对象也序列化传出去
>
> 如 self.field，整个self代表的对象也传出去了
>
> Spark 就会把整个对象发到工作节点上，这可能比你想传递的东西大得多

```
def getMatchesFunctionReference(self, rdd): 
    # 问题:在"self.isMatch"中引用了整个self 
    return rdd.filter(self.isMatch)
```

应该：

```
def getMatchesNoReference(self, rdd):
    # 安全:只把需要的字段提取到局部变量中
    query = self.query
    return rdd.filter(lambda x: query in x)
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://im-qianuxn.gitbook.io/pytorch/ji-suan-ji/spark-hadoop/spark-rdd.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
