首次使用Spark
从开始到执行
1. 安装Scala
brew install scala
2. 复制 Spark
git clone https://github.com/apache/spark.git
3. 移至Spark的根目录
cd spark
将版本更改为1.3.1
git checkout -b version2 v1.3.1
使用mvn命令进行安装
mvn -DskipTests clean package
在work/logs/test.txt中创建测试数据。
执行spark-shell
macbook:spark user$ ./bin/spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.3.1
/_/
从本地文件生成RDD。
scala> val file = sc.textFile("work/logs/test.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
使用用户1的列进行映射。
scala> val filter = file.filter(_.contains("user1"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23
10. 打印处理
scala> println(filter.count)
2
使用spark-submit命令创建一个standalone。
将已安装的 Spark 添加到路径中
echo 'alias spark-shell="~/work/spark/bin/spark-shell"' >>~/.bash_aliases
echo 'alias spark-submit="~/work/spark/bin/spark-submit"'>>~/.bash_aliases
source ~/.bash_alias
2. 安装sbt。
brew install sbt
3.文件的架构
3.文件的组成
$ mkdir ~/work
$ cd work
$ mkdir spark
$ mkdir spark/src
$ mkdir spark/src/main
$ mkdir spark/src/main/scala
$ mkdir spark/project
$ mkdir spark/log
$ vi spark/log/access_log #access_logは適宜用意
$ mkdir spark/target
创建一个build.sbt文件 ※1
//build-version
version := "0.1"
//scalaVersion
scalaVersion := "2.11.7"
5.依存ライブラリのinstall
libraryDependencies ++= Seq(
"org.xerial" % "sqlite-jdbc" % "3.7.2",
"org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
//"org.apache.spark" %% "spark-streaming" % "1.3.1"
//"org.apache.spark" %% "spark-streaming-kafka" % "1.3.1"
)
//※3 重複防止
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
创建6.plugin.sbt文件
logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
解析access_log文件并获取记录数(示例.scala)
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
object ExampleApp {
def main(args: Array[String]) {
//用意したaccess_log
val logFile = "log/access_log"
//AppNameの設定コメントで書いているがsubmitコマンド押下時に読み込まれるAppNameはファイル名
val conf = new SparkConf().setAppName("Example Application")
//sparkcontextのインスタンス生成
val sc = new SparkContext(conf)
//access_logデータを取得と同時にキャッシュに読み込み
val logData = sc.textFile(logFile, 2).cache()
//logDataの中からlinkを取得+count
val index = logData.filter(line => line.contains("index.html")).count()
val html1 = logData.filter(line => line.contains("1.html")).count()
val html2 = logData.filter(line => line.contains("2.html")).count()
val gif1 = logData.filter(line => line.contains("1.gif")).count()
val gif2 = logData.filter(line => line.contains("2.gif")).count()
//表示
println("Lines with index.html: %s".format(index))
println("Lines with 1.html: %s".format(html1))
println("Lines with 2.html: %s".format(html2))
println("Lines with 1.gif: %s".format(gif1))
println("Lines with 2.gif: %s".format(gif2))
}
}
8. 创建.jar文件 ※1
sbt assembly
9. 运行spark-submit
spark-submit --class ExampleApp target/scala-2.11/scala-assembly-0.1.jar
10. 执行结果(仅结果)。
Lines with index.html: 141882
Lines with 1.html: 19996
Lines with 2.html: 18567
Lines with 1.gif: 20996
Lines with 2.gif: 52393
11.等等。
请用空行将Name、Version、ScalaVersion、LibraryDependencies等内容分别写在一起。在最后也要加上一个空行。
※2
根据sbt的版本,似乎需要进行一些描述。
在当前的版本中,会出现错误。
※3(只需一种选项)
※3
在初始设置中,没有这个描述,导致依赖库重复,导致在创建jar文件时发生错误。