首次使用Spark

从开始到执行

1. 安装Scala

brew install scala

2. 复制 Spark

git clone https://github.com/apache/spark.git

3. 移至Spark的根目录

cd spark 

将版本更改为1.3.1

git checkout -b version2 v1.3.1

使用mvn命令进行安装

mvn -DskipTests clean package

在work/logs/test.txt中创建测试数据。

执行spark-shell

macbook:spark user$ ./bin/spark-shell 

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1
      /_/

从本地文件生成RDD。

scala> val file = sc.textFile("work/logs/test.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

使用用户1的列进行映射。

scala> val filter = file.filter(_.contains("user1"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23

10. 打印处理

scala>  println(filter.count)
2

使用spark-submit命令创建一个standalone。

将已安装的 Spark 添加到路径中

echo 'alias spark-shell="~/work/spark/bin/spark-shell"' >>~/.bash_aliases
echo 'alias spark-submit="~/work/spark/bin/spark-submit"'>>~/.bash_aliases
source ~/.bash_alias

2. 安装sbt。

brew install sbt

3.文件的架构

3.文件的组成

$ mkdir ~/work
$ cd work
$ mkdir spark
$ mkdir spark/src
$ mkdir spark/src/main
$ mkdir spark/src/main/scala
$ mkdir spark/project
$ mkdir spark/log
$ vi spark/log/access_log #access_logは適宜用意
$ mkdir spark/target

创建一个build.sbt文件 ※1

//build-version
version := "0.1" 

//scalaVersion
scalaVersion := "2.11.7" 

5.依存ライブラリのinstall
libraryDependencies ++= Seq(
   "org.xerial" % "sqlite-jdbc" % "3.7.2",
   "org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
   //"org.apache.spark" %% "spark-streaming" % "1.3.1"
   //"org.apache.spark" %% "spark-streaming-kafka" % "1.3.1"
)

//※3 重複防止
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

创建6.plugin.sbt文件

logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

解析access_log文件并获取记录数(示例.scala)

import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//import org.apache.spark.streaming.{Time, Seconds, StreamingContext}

object ExampleApp {
  def main(args: Array[String]) {
    //用意したaccess_log
    val logFile = "log/access_log"
    //AppNameの設定コメントで書いているがsubmitコマンド押下時に読み込まれるAppNameはファイル名
    val conf = new SparkConf().setAppName("Example Application")
    //sparkcontextのインスタンス生成
    val sc = new SparkContext(conf)
    //access_logデータを取得と同時にキャッシュに読み込み
    val logData = sc.textFile(logFile, 2).cache()
    //logDataの中からlinkを取得+count
    val index = logData.filter(line => line.contains("index.html")).count()
    val html1 = logData.filter(line => line.contains("1.html")).count()
    val html2 = logData.filter(line => line.contains("2.html")).count()
    val gif1  = logData.filter(line => line.contains("1.gif")).count()
    val gif2  = logData.filter(line => line.contains("2.gif")).count()
    //表示
    println("Lines with index.html: %s".format(index))
    println("Lines with 1.html: %s".format(html1))
    println("Lines with 2.html: %s".format(html2))
    println("Lines with 1.gif:  %s".format(gif1))
    println("Lines with 2.gif:  %s".format(gif2))
  }
}

8. 创建.jar文件 ※1

sbt assembly

9. 运行spark-submit

spark-submit --class ExampleApp target/scala-2.11/scala-assembly-0.1.jar

10. 执行结果(仅结果)。

Lines with index.html: 141882
Lines with 1.html: 19996
Lines with 2.html: 18567
Lines with 1.gif:  20996
Lines with 2.gif:  52393

11.等等。

请用空行将Name、Version、ScalaVersion、LibraryDependencies等内容分别写在一起。在最后也要加上一个空行。

※2

根据sbt的版本,似乎需要进行一些描述。
在当前的版本中,会出现错误。

※3(只需一种选项)

※3

在初始设置中,没有这个描述,导致依赖库重复,导致在创建jar文件时发生错误。