# 键值对-Pair RDD

导入文件

```python
testpair_rdd=sc.textFile('testpair.txt')
testpair_rdd.collect()
'''
['hello1 bulse1 , this is china1',
 'hello2 bulse12 , this is china2',
 'hello3 bulse13 , this is china3',
 'hello4 bulse14 , this is china4']
'''
```

### 创建

#### map

第一个单词作为key，原来的值为value，组成的键值对RDD：pairs

```python
pairs=testpair_rdd.map(lambda x:(x.split(' ')[0],x))
pairs.collect()
'''
[('hello1', 'hello1 bulse1 , this is china1'),
 ('hello2', 'hello2 bulse12 , this is china2'),
 ('hello3', 'hello3 bulse13 , this is china3'),
 ('hello4', 'hello4 bulse14 , this is china4')]
'''
```

#### reduceByKey

reduceByKey合并具有相同键的值

```python
nums = sc.parallelize({(1, 2), (3, 4), (3, 6)})

# 对键值相同的两个元素进行相加：x + y:4+6
print(nums.reduceByKey(lambda x, y : x + y).collect())
# 对键值相同的两个元素进行相减：x - y:4-6
print(nums.reduceByKey(lambda x, y : x - y).collect())

'''
[(1, 2), (3, 10)]
[(1, 2), (3, -2)]
'''
```

#### groupByKey

```python
# 具有相同键的值进行分组，value是相同key合并的值的迭代器
nums.groupByKey().collect()

'''
[(1, <pyspark.resultiterable.ResultIterable at 0x110e77be0>),
 (3, <pyspark.resultiterable.ResultIterable at 0x110a86b38>)]
'''
```

### Pair RDD操作

```python
# 返回一个仅包含键的 RDD
nums.keys().distinct().collect()

# 返回一个仅包含值的 RDD
# rdd.values()

# 根据键排序的 RDD
# rdd.sortByKey()
```

### 两个pair RDD的转化操作

```python
rdd = sc.parallelize({(1, 2), (3, 4), (3, 6)})
other=sc.parallelize({(3, 9)})

# 删掉 RDD 中键(1,3)与 other RDD 中的键(3)相同的元素，得到只有键1的元素
rdd.subtractByKey(other=other).collect()
'''
[(1, 2)]
'''
```

### 聚合操作

```python
# 对两个 RDD 进行内连接，SQL的那个内连接类似
rdd.join(other=other).collect()
# [(3, (4, 9)), (3, (6, 9))]


# 右连
# rdd.rightOuterJoin(other)

# 左连接
# rdd.leftOuterJoin(other)

# 有相同键的数据分组
# rdd.cogroup(other):{(1,([2],[])), (3, ([4, 6],[9]))}
```

### mapValues

对键的值进行转换

```python
# 每个键的值map为(v,1)//rdd = sc.parallelize({(1, 2), (3, 4), (3, 6)})
rdd.mapValues(lambda x: (x, 1)).collect()
#[(1, (2, 1)), (3, (4, 1)), (3, (6, 1))]


# 基于键对(v,1)进行reduce:值相加,数相加
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
#[(1, (2, 1)), (3, (10, 2))]
# 有2个3key，3key对应的value和为10，这样可以求键值3的平均值
```

### reduceByKey() 对所有的单词进行计数

```python
rdd = testpair_rdd
# 把每行的字符串按照空格切割成一个集合，然后把集合flat压扁为一维
words = rdd.flatMap(lambda x: x.split(" "))
# 给words中每个元素赋值一个1，这样可以按key累加得到相同key（单词）的数量
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

result.take(5)
#[('hello1', 1), ('bulse1', 1), (',', 4), ('this', 4), ('is', 4)]
```

### 数据分组

如果数据已经以预期的方式提取了键，groupByKey() 就会使用 RDD 中的键来对数据进行分组

对于一个由类型 K 的键和类型 V 的值组成的 RDD，所得到的结果 RDD 类型会是 \[K, Iterable\[V]]。

```
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x)).collect()
```

## Pair RDD的行动操作

```
所有基础 RDD 支持的传统行动操作也都在 pair RDD 上可用
{(1, 2), (3, 4), (3, 6)}
countByKey() /对每个键对应的元素分别计数/rdd.countByKey()：{(1, 1), (3, 2)}
collectAsMap()将结果以映射表的形式返回，以便查询/rdd.collectAsMap()：Map{(1, 2), (3, 4), (3, 6)}
lookup(key)/返回给定键对应的所有值/rdd.lookup(3)：[4, 6]
```

collectAsMap：RDD中同一个Key中存在多个Value，那么后面的Value将会把前面的Value覆盖，最终得到的结果就是Key唯一，而且对应一个Value。

### 读取/保存文件

textFile、wholeTextFiles

```python
# 读取一个文件
input = sc.textFile("file:///home/holden/repos/spark/README.md")
# 如果文件足够小，那么可以使用 SparkContext. wholeTextFiles() 方法，该方法会返回一个 pair RDD，其中键是输入文件的文件名。

# 读取目录下全部文件
input = sc.wholeTextFiles("file://home/holden/salesFiles")
```

saveAsTextFile

```python
# 保存数据为文件：https://blog.csdn.net/frogbar/article/details/79174456
# saveAsTextFile要求保存的目录之前是没有的
rd1 =sc.parallelize(list(range(10000)))

rd1.saveAsTextFile('./rd1')
```

JSON文件

```python
# 读取非结构化的 JSON
# 要求：假设文件中的每一行都是一条 JSON 记录
'''
import json
data = input.map(lambda x: json.loads(x))
'''

# 保存为 JSON
# 筛选出喜爱熊猫的人，然后这些x对象转成json字符串，然后saveAsTextFile
'''
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
.saveAsTextFile(outputPath))
'''
```

CSV文件

背景

```python
import csv
from io import StringIO

output = StringIO()
# 会把字典写成fieldnames格式的csv
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
# 遍历里面的字典元素，一个字典写成一行，自动加入\r\n
for record in [{'name':'sds','favoriteAnimal':'ww'},{'name':'sds2','favoriteAnimal':'ww2'}]:
    writer.writerow(record)

print(output.getvalue())

#['sds,ww\r\nsds2,ww2\r\n']
```

CSV读写

```python
# Python 中完整读取 CSV
'''
def loadRecords(fileNameContents):
    """读取给定文件中的所有记录"""
    input = StringIO.StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
    return reader

fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
'''

# 写入csv
'''
假设一个rdd有10个元素，分成3个分区。
如果使用map方法，map中的输入函数会被调用10次；
而使用mapPartitions方法的话，其输入函数会只会被调用3次，每个分区调用1次。

def writeRecords(records):
    """写出一些CSV记录"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

rdd.mapPartitions(writeRecords).saveAsTextFile(outputFile)
'''
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://im-qianuxn.gitbook.io/pytorch/ji-suan-ji/spark-hadoop/pairrdd.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
