You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dev-wiki/docs/jupyter/Spark上手示例1:RDD操作.ipynb

611 lines
13 KiB
Plaintext

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# 引入pyspark,并创建spark上下文\n",
"import findspark\n",
"findspark.init()\n",
"import pyspark\n",
"sc = pyspark.SparkContext()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. 创建RDD的第一种方式读外部数据比如本地磁盘文件"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"rdd = sc.textFile('./dataset/Goodbye_Object_Oriented_Programming.txt')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pyspark.rdd.RDD"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 查看rdd类型\n",
"type(rdd)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.1 RDD之转换Transformation"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n",
"Wall time: 31 µs\n"
]
}
],
"source": [
"%%time\n",
"## map是转换操作的一种这时候只是形成DAG\n",
"rdd = rdd.map(lambda x: len(x))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.2 RDD之行动Action"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"13187\n",
"CPU times: user 12 ms, sys: 0 ns, total: 12 ms\n",
"Wall time: 1.58 s\n"
]
}
],
"source": [
"%%time\n",
"## reduce是行动操作的一种, 这个时候才真正的计算\n",
"charCount = rdd.reduce(lambda x, y: x+y)\n",
"\n",
"print(charCount)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" 328 2260 13687 ./dataset/Goodbye_Object_Oriented_Programming.txt\r\n"
]
}
],
"source": [
"! wc ./dataset/Goodbye_Object_Oriented_Programming.txt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.3 示例:统计单词出现的次数"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['Ive been programming in Object Oriented languages for decades. The first OO language I used was C++ and then Smalltalk and finally .NET and Java.',\n",
" '']"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wordRdd = sc.textFile('./dataset/Goodbye_Object_Oriented_Programming.txt')\n",
"\n",
"# take操作就是一种Action, 返回前n数据\n",
"wordRdd.take(2) "
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# 将每一行文本打散\n",
"wordRdd = wordRdd.map(lambda line: line.split(' '))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[['Ive',\n",
" 'been',\n",
" 'programming',\n",
" 'in',\n",
" 'Object',\n",
" 'Oriented',\n",
" 'languages',\n",
" 'for',\n",
" 'decades.',\n",
" 'The',\n",
" 'first',\n",
" 'OO',\n",
" 'language',\n",
" 'I',\n",
" 'used',\n",
" 'was',\n",
" 'C++',\n",
" 'and',\n",
" 'then',\n",
" 'Smalltalk',\n",
" 'and',\n",
" 'finally',\n",
" '.NET',\n",
" 'and',\n",
" 'Java.'],\n",
" ['']]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wordRdd.take(2)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2493"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 扁平化处理\n",
"\n",
"wordRdd = wordRdd.flatMap(lambda x: x)\n",
"\n",
"# 查看有多少个单词\n",
"wordRdd.count()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['Ive', 'been']"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 查看前两条数据\n",
"wordRdd.take(2)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2260"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 过滤掉空格数据\n",
"wordRdd = wordRdd.filter(lambda x: x != '')\n",
"\n",
"# 查看有多少个单词\n",
"wordRdd.count()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('Ive', 1), ('been', 1)]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 转换成key-value形式rdd 即 (key, value)\n",
"wordRdd = wordRdd.map(lambda word: (word, 1))\n",
"\n",
"wordRdd.take(2)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('face', 1),\n",
" ('was', 18),\n",
" ('Monkey', 2),\n",
" ('how', 4),\n",
" ('Just', 1),\n",
" ('for', 11),\n",
" ('Directories', 1),\n",
" ('could', 4),\n",
" ('gained', 1),\n",
" ('AGAIN', 1)]"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wordRdd = wordRdd.reduceByKey(lambda x, y: x+y)\n",
"\n",
"# 查看一下\n",
"wordRdd.take(10)\n",
"\n",
"# 查看全部\n",
"# wordRdd.collect()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"# 使用pandas继续计算\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>word</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>face</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>was</td>\n",
" <td>18</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>Monkey</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>how</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>Just</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>for</td>\n",
" <td>11</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>Directories</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>could</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>gained</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>AGAIN</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" word count\n",
"0 face 1\n",
"1 was 18\n",
"2 Monkey 2\n",
"3 how 4\n",
"4 Just 1\n",
"5 for 11\n",
"6 Directories 1\n",
"7 could 4\n",
"8 gained 1\n",
"9 AGAIN 1"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame(wordRdd.collect())\n",
"\n",
"# 设置栏位名字\n",
"df.columns = ['word', 'count']\n",
"\n",
"\n",
"# 查看前10条数据\n",
"df.head(10)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>word</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>263</th>\n",
" <td>the</td>\n",
" <td>121</td>\n",
" </tr>\n",
" <tr>\n",
" <th>271</th>\n",
" <td>to</td>\n",
" <td>57</td>\n",
" </tr>\n",
" <tr>\n",
" <th>576</th>\n",
" <td>of</td>\n",
" <td>47</td>\n",
" </tr>\n",
" <tr>\n",
" <th>358</th>\n",
" <td>and</td>\n",
" <td>45</td>\n",
" </tr>\n",
" <tr>\n",
" <th>589</th>\n",
" <td>a</td>\n",
" <td>41</td>\n",
" </tr>\n",
" <tr>\n",
" <th>797</th>\n",
" <td>is</td>\n",
" <td>38</td>\n",
" </tr>\n",
" <tr>\n",
" <th>136</th>\n",
" <td>in</td>\n",
" <td>35</td>\n",
" </tr>\n",
" <tr>\n",
" <th>593</th>\n",
" <td>I</td>\n",
" <td>32</td>\n",
" </tr>\n",
" <tr>\n",
" <th>685</th>\n",
" <td>that</td>\n",
" <td>29</td>\n",
" </tr>\n",
" <tr>\n",
" <th>645</th>\n",
" <td>The</td>\n",
" <td>26</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" word count\n",
"263 the 121\n",
"271 to 57\n",
"576 of 47\n",
"358 and 45\n",
"589 a 41\n",
"797 is 38\n",
"136 in 35\n",
"593 I 32\n",
"685 that 29\n",
"645 The 26"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# 查看出现次数最多的十个单词\n",
"df =df.sort_values(by='count', ascending=False)\n",
"\n",
"\n",
"df.head(10)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"# 停止spark上下文\n",
"sc.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}