在Spark-shell中处理Cassandra行

启动 spark-shell

cd spark-1.5.0
bin/spark-shell --jars spark-cassandra-connector-assembly-1.5.0-M2-SNAPSHOT.jar --conf spark.cassandra.connection.host = 127.0.0.1

获取cassandraTable

import com.datastax.spark.connector._ //Imports basic rdd functions
import com.datastax.spark.connector.cql._ //(Optional) Imports java driver helper functions

val c = CassandraConnector(sc.getConf)

val d = sc.cassandraTable("test_from_spark", "fun");

scala> d.collect.foreach(println)
CassandraRow{k: 1, v: 10}                                                       
CassandraRow{k: 2, v: 20}


选择列、获取列数据、使用where子句进行筛选


d.select("k").where("k = ?", 10).foreach(println)

CassandraRow(k: 10)

d.select("k", "v").where("k = ?", 12).map(row => row.get[Int]("v")).collect
res2: Array[Int] = Array(120)


向Cassandra表中写入

val d = [なんかRDD]

d.saveToCassandra("keyspace", "table", SomeColumns("col1", "col2", ....))



链接

bannerAds