使用Spark进行Cassandra数据加载的备忘录
这是一个简单的使用Spark进行Cassandra数据加载的备忘录。
请参考以下链接以获取 Cassandra 集群的构建步骤:
https://qiita.com/48hands/items/05c2ad0ea89fe13afd57
首先,创建Keyspace和Table。
$ ssh nosql1
[vagrant@nosql1 ~]$ cqlsh nosql1
Connected to develop-cluster at nosql1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };
cqlsh> CREATE TABLE test.kv(key text PRIMARY KEY, value int);
接下来,我们将使用Scala语言编写Spark代码。内容可以随意编写。
设置 build.sbt
name := "CassandraLoad"
version := "0.1"
scalaVersion := "2.11.12"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"datastax" % "spark-cassandra-connector" % "2.0.1-s_2.11",
"org.apache.spark" %% "spark-sql" % "2.2.0"
)
创建一个名为LoadSample.scala的文件
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.cassandra._
object LoadSample {
def main(args: Array[String]): Unit = {
import spark.implicits._
val dfs = for (i <- 1 to 100) yield {
val rdd: RDD[Int] = spark.sparkContext.parallelize(1 to 100000)
rdd.map(line => (s"key-$i-$line", line)).toDF("key", "value")
}
dfs.foreach(df =>
df.write.cassandraFormat("kv", "test").mode(SaveMode.Overwrite).save())
}
private val spark = SparkSession
.builder
.master("local[*]") // ローカルでエグゼキュータにクライアントPCのコア数と同数のスレッドを割り当て
.appName("Load Sample")
.config("spark.cassandra.connection.host",
"192.168.33.41,192.168.33.42,192.168.33.43")
.config("spark.cassandra.output.consistency.level", "ONE") // デフォルトだとLOCAL_QUORUMなので変更した。
.getOrCreate()
}
执行之后,数据应该包含100,000 * 100 = 10,000,000条记录。
$ ssh nosql1
[vagrant@nosql1 ~]$ cqlsh nosql1 -k test
Connected to develop-cluster at nosql1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh:test> select * from kv limit 100;
key | value
-------------+---------
key-4-28995 | 28995
key-4-78904 | 78904
key-4-61524 | 61524
key-4-13764 | 13764
...
应该可以感觉到已经进入了。