使用Spark和Cassandra进行数据协同

首先

在数据分析中,经常有使用Apache Spark + Cassandra的组合来实现的选项。

Apache Spark是一种开源的大数据处理框架。

image.png

可以访问HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等数百个其他数据源中的数据。

强大而分布式的数据集(Resilient Distributed Dataset,简称RDD)、数据框架(DataFrame)和数据集(DataSet)等。。。

来源:https://spark.apache.org/

出处:https://spark.apache.org/

Cassandra是什么

image.png

高效地管理海量数据,快速处理,无需担忧睡眠丧失。

引用来源:http://cassandra.apache.org/

特别是从一开始就考虑了可扩展性,因此可以轻松地建立集群。

将CSV文件数据保存到Cassandra的示例。

我将尝试创建一个将CSV文件保存到Cassandra的Spark示例,因为Spark具有许多功能。

创建一个名为”users.csv”的示例文件。

image.png

在Gradle项目中引入库

dependencies {
    // https://mvnrepository.com/artifact/org.scala-lang/scala-library
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'

    // https://mvnrepository.com/artifact/org.apache.spark/spark-core
    compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'

    // https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
    compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'

    // https://mvnrepository.com/artifact/org.apache.spark/spark-sql
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'

    // https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
    compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'

}

将CSV文件保存到Cassandra,然后从数据库中进行检索。

package com.test.spark;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;

import java.util.List;

import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class CsvReader {

    private static final Logger logger = Logger.getLogger(CsvReader.class);

    public static void main(String[] args) {

        // Spark設定
        SparkConf conf = new SparkConf();
        conf.setAppName("CSVReader");
        conf.setMaster("local[*]");
        conf.set("spark.cassandra.connection.host", "192.168.10.248");
        conf.set("spark.cassandra.connection.port", "9042");

        // Cassandraのkeyspaceとテーブル名
        String keyspace = "sample";
        String tableUser = "user";
        String userCsv = "C:\\data\\spark\\users.csv";

        JavaSparkContext sc = new JavaSparkContext(conf);
        try {
            SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
                    .config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();

            // Cassandraのコネクション
            CassandraConnector connector = CassandraConnector.apply(sc.getConf());

            try (Session session = connector.openSession()) {
                // keyspaceある場合は削除する
                session.execute("DROP KEYSPACE IF EXISTS " + keyspace);

                // keyspaceを作成する
                session.execute("CREATE KEYSPACE " + keyspace
                        + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");

                // テーブルを作成する
                session.execute("CREATE TABLE " + keyspace + "." + tableUser
                        + "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
            }

            // CSVからデータを取得する
            // テーブル定義に合わせるため、カラムのASも重要
            Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
                    .option("encoding", "UTF-8").load(userCsv).select(new Column("ユーザーID").as("user_id"),
                            new Column("氏名").as("user_name"), 
                            new Column("メールアドレス").as("email_address"),
                            new Column("備考").as("memo"));

            // Cassandraに保存
            csv.write().format("org.apache.spark.sql.cassandra")
                    .option("header", "true")
                    .option("keyspace", keyspace)
                    .option("table", tableUser)
                    .option("column", "user_id")
                    .option("column", "user_name")
                    .option("column", "email_address")
                    .option("column", "memo")
                    .mode(SaveMode.Append)
                    .save();

            // Cassandraからデータを読み出す
            Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
                    .option("keyspace", keyspace)
                    .option("table", tableUser).load();

            // データセットから配列を取得する
            List<Row> asList = dataset.collectAsList();
            for (Row r : asList) {
                logger.info(r);
            }
        } catch (Exception e) {
            logger.error(e);
        } finally {
            sc.stop();
            sc.close();
        }
    }
}

使用者训练来学习,以生成自然语言文本和下一步动作的AI模型。

image.png

通过JAVA检索到的用户数据

19/10/11 23:18:27 INFO CsvReader: [A000002,yamada.bb@test.com,入社10年目,山田 三郎]
19/10/11 23:18:27 INFO CsvReader: [A000004,tanaka.bb@test.com,入社3年目,田中 次郎]
19/10/11 23:18:27 INFO CsvReader: [A000003,tanaka.aa@test.com,入社5年目,田中 一郎]
19/10/11 23:18:27 INFO CsvReader: [A000001,yamada.aa@test.com,入社1年目,山田 太郎]

基本操作和其他详细资料可在指南中找到。
Spark编程指南:https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

以上

bannerAds