使用Spark和Cassandra进行数据协同
首先
在数据分析中,经常有使用Apache Spark + Cassandra的组合来实现的选项。
Apache Spark是一种开源的大数据处理框架。

可以访问HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等数百个其他数据源中的数据。
强大而分布式的数据集(Resilient Distributed Dataset,简称RDD)、数据框架(DataFrame)和数据集(DataSet)等。。。
来源:https://spark.apache.org/
出处:https://spark.apache.org/
Cassandra是什么

高效地管理海量数据,快速处理,无需担忧睡眠丧失。
引用来源:http://cassandra.apache.org/
特别是从一开始就考虑了可扩展性,因此可以轻松地建立集群。
将CSV文件数据保存到Cassandra的示例。
我将尝试创建一个将CSV文件保存到Cassandra的Spark示例,因为Spark具有许多功能。
创建一个名为”users.csv”的示例文件。

在Gradle项目中引入库
dependencies {
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'
}
将CSV文件保存到Cassandra,然后从数据库中进行检索。
package com.test.spark;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class CsvReader {
private static final Logger logger = Logger.getLogger(CsvReader.class);
public static void main(String[] args) {
// Spark設定
SparkConf conf = new SparkConf();
conf.setAppName("CSVReader");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", "192.168.10.248");
conf.set("spark.cassandra.connection.port", "9042");
// Cassandraのkeyspaceとテーブル名
String keyspace = "sample";
String tableUser = "user";
String userCsv = "C:\\data\\spark\\users.csv";
JavaSparkContext sc = new JavaSparkContext(conf);
try {
SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();
// Cassandraのコネクション
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
// keyspaceある場合は削除する
session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
// keyspaceを作成する
session.execute("CREATE KEYSPACE " + keyspace
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
// テーブルを作成する
session.execute("CREATE TABLE " + keyspace + "." + tableUser
+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
}
// CSVからデータを取得する
// テーブル定義に合わせるため、カラムのASも重要
Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
.option("encoding", "UTF-8").load(userCsv).select(new Column("ユーザーID").as("user_id"),
new Column("氏名").as("user_name"),
new Column("メールアドレス").as("email_address"),
new Column("備考").as("memo"));
// Cassandraに保存
csv.write().format("org.apache.spark.sql.cassandra")
.option("header", "true")
.option("keyspace", keyspace)
.option("table", tableUser)
.option("column", "user_id")
.option("column", "user_name")
.option("column", "email_address")
.option("column", "memo")
.mode(SaveMode.Append)
.save();
// Cassandraからデータを読み出す
Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace", keyspace)
.option("table", tableUser).load();
// データセットから配列を取得する
List<Row> asList = dataset.collectAsList();
for (Row r : asList) {
logger.info(r);
}
} catch (Exception e) {
logger.error(e);
} finally {
sc.stop();
sc.close();
}
}
}
使用者训练来学习,以生成自然语言文本和下一步动作的AI模型。

通过JAVA检索到的用户数据
19/10/11 23:18:27 INFO CsvReader: [A000002,yamada.bb@test.com,入社10年目,山田 三郎]
19/10/11 23:18:27 INFO CsvReader: [A000004,tanaka.bb@test.com,入社3年目,田中 次郎]
19/10/11 23:18:27 INFO CsvReader: [A000003,tanaka.aa@test.com,入社5年目,田中 一郎]
19/10/11 23:18:27 INFO CsvReader: [A000001,yamada.aa@test.com,入社1年目,山田 太郎]
基本操作和其他详细资料可在指南中找到。
Spark编程指南:https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
以上