在Slick中尝试使用Apache Ignite

这篇文章是仅仅是”集团Advent Calendar 2019″第19天的一篇文章。

首先

我們對於高效地分散處理大規模數據,試著從Slick驗證使用Apache Ignite。

Apache Ignite 是什么

这是一个在事务、分析和流媒体工作负载方面实现了拥有千万亿字节内存速度的内存计算平台。

可以用作分散数据库

Apache Ignite可以作为一种支持SQL、键值、计算、机器学习和其他数据处理API的一体化分布式数据库。

可以使用SQL

SQL的ANSI-99標準

Ignite符合SQL ANSI-99标准,支持包括SELECT、UPDATE、INSERT、MERGE、DELETE语句和分布式连接在内的所有SQL和DML命令。此外,还提供了与分布式SQL数据库相关的DDL命令子集的支持。

可以用作KVS

JCache (JSR 107)符合标准。

    • インメモリキーバリューストア

 

    • 基本的なキャッシュ操作

 

    • ConcurrentMap API

 

    • Collocated Processing(EntryProcessor)

 

    • Events and Metrics

 

    プラグ可能な永続性

与Cassandra的比较文章

Apache Cassandra与Apache Ignite:亲和性分配和分布式SQL
Apache® Ignite™和Apache® Cassandra™基准测试:内存计算的威力

请参阅以下内容以获取其他用例和详细信息:Apache Ignite 的使用示例。

尝试使用Slick进行体验

执行环境

引发点火程序的配置文件

我将准备服务器端的设置文件。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

        <!-- Configure internal thread pool. -->
        <property name="publicThreadPoolSize" value="64"/>

        <!-- Configure system thread pool. -->
        <property name="systemThreadPoolSize" value="32"/>

        <property name="binaryConfiguration">
            <bean class="org.apache.ignite.configuration.BinaryConfiguration">
                <property name="compactFooter" value="false"/>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="default"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="statisticsEnabled" value="true"/>
                    <property name="nearConfiguration">
                        <bean class="org.apache.ignite.configuration.NearCacheConfiguration">
                            <property name="nearEvictionPolicy">
                                <bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
                                    <property name="maxSize" value="10000"/>
                                </bean>
                            </property>
                        </bean>
                    </property>
                </bean>
            </list>
        </property>
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Docker Compose: Docker组合

我們將以 volume mount 的方式來讀取剛才創建的設定文件。

version: "3"

services:

  ignite:
    image: apacheignite/ignite:2.7.6
    environment:
      - CONFIG_URI=/opt/ignite/apache-ignite/config/ignite.xml
    volumes:
      - ./ignite.xml:/opt/ignite/apache-ignite/config/ignite.xml
    ports:
      - "47100:47100"
      - "47500-47509:47500-47509"
      - "10800:10800"

开动

使用docker-compose up命令启动。

docker-compose up
Starting ignite-slick_ignite_1 ... done
Attaching to ignite-slick_ignite_1
ignite_1  | [02:39:33]    __________  ________________ 
ignite_1  | [02:39:33]   /  _/ ___/ |/ /  _/_  __/ __/ 
ignite_1  | [02:39:33]  _/ // (7 7    // /  / / / _/   
ignite_1  | [02:39:33] /___/\___/_/|_/___/ /_/ /___/  
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] ver. 2.7.6#20190911-sha1:21f7ca41
ignite_1  | [02:39:33] 2019 Copyright(C) Apache Software Foundation
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] Ignite documentation: http://ignite.apache.org
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] Quiet mode.
ignite_1  | [02:39:33]   ^-- Logging to file '/opt/ignite/apache-ignite/work/log/ignite-72f112a2.0.log'
ignite_1  | [02:39:33]   ^-- Logging by 'JavaLogger [quiet=true, config=null]'
ignite_1  | [02:39:33]   ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat}
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] OS: Linux 4.9.184-linuxkit amd64
ignite_1  | [02:39:33] VM information: OpenJDK Runtime Environment 1.8.0_212-b04 IcedTea OpenJDK 64-Bit Server VM 25.212-b04
ignite_1  | [02:39:33] Please set system property '-Djava.net.preferIPv4Stack=true' to avoid possible problems in mixed environments.
ignite_1  | [02:39:33] Initial heap size is 32MB (should be no less than 512MB, use -Xms512m -Xmx512m).
ignite_1  | [02:39:33] Configured plugins:
ignite_1  | [02:39:33]   ^-- None
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] Configured failure handler: [hnd=StopNodeOrHaltFailureHandler [tryStop=false, timeout=0, super=AbstractFailureHandler [ignoredFailureTypes=[SYSTEM_WORKER_BLOCKED, SYSTEM_CRITICAL_OPERATION_TIMEOUT]]]]
ignite_1  | [02:39:33] Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.
ignite_1  | [02:39:34] Security status [authentication=off, tls/ssl=off]
ignite_1  | [02:39:36] Nodes started on local machine require more than 80% of physical RAM what can lead to significant slowdown due to swapping (please decrease JVM heap size, data region size or checkpoint buffer size) [required=944MB, available=1998MB]
ignite_1  | [02:39:36] Performance suggestions for grid  (fix if possible)
ignite_1  | [02:39:36] To disable, set -DIGNITE_PERFORMANCE_SUGGESTIONS_DISABLED=true
ignite_1  | [02:39:36]   ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM options)
ignite_1  | [02:39:36]   ^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)
ignite_1  | [02:39:36]   ^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)
ignite_1  | [02:39:36]   ^-- Disable processing of calls to System.gc() (add '-XX:+DisableExplicitGC' to JVM options)
ignite_1  | [02:39:36]   ^-- Disable near cache (set 'nearConfiguration' to null)
ignite_1  | [02:39:36]   ^-- Decrease number of backups (set 'backups' to 0)
ignite_1  | [02:39:36] Refer to this page for more performance suggestions: https://apacheignite.readme.io/docs/jvm-and-system-tuning
ignite_1  | [02:39:36] 
ignite_1  | [02:39:36] To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}
ignite_1  | [02:39:36] Data Regions Configured:
ignite_1  | [02:39:36]   ^-- default [initSize=256.0 MiB, maxSize=399.7 MiB, persistence=false]
ignite_1  | [02:39:36] 
ignite_1  | [02:39:36] Ignite node started OK (id=72f112a2)
ignite_1  | [02:39:36] Topology snapshot [ver=1, locNode=72f112a2, servers=1, clients=0, state=ACTIVE, CPUs=8, offheap=0.39GB, heap=0.43GB]

如果能变成这种感觉,就可以了。

准备示例代码

准备build.sbt文件

请按如下进行填写。

name := "ignite-slick"

version := "0.1"

scalaVersion := "2.13.1"

val igniteVersion = "2.7.6"
val slickVersion  = "3.3.2"
libraryDependencies ++= Seq(
  "org.apache.ignite"  % "ignite-core"     % igniteVersion,
  "org.apache.ignite"  % "ignite-clients"  % igniteVersion,
  "com.typesafe.slick" %% "slick"          % slickVersion,
  "com.typesafe.slick" %% "slick-hikaricp" % slickVersion,
  "org.scalatest"      %% "scalatest"      % "3.2.0-M1" % Test
)

准备模型

这次我决定输入人的数据。

package model

import java.time.LocalDate

import model.`enum`.Sex

final case class Person(
                         id: Long,
                         name: String,
                         sex: Option[Sex],
                         birthday: LocalDate,
                         father: Option[Long],
                         mother: Option[Long]
                       )

Slick Tables提供的服务

package model

import java.time.LocalDate

import model.`enum`.Sex

// AUTO-GENERATED Slick data model
/** Stand-alone Slick data model for immediate use */
object Tables extends {
  val profile = slick.jdbc.PostgresProfile
} with Tables

/** Slick data model trait for extension, choice of backend or usage in the cake pattern. (Make sure to initialize this late.) */
trait Tables {
  val profile: slick.jdbc.JdbcProfile

  import profile.api._

  /** DDL for all tables. Call .create to execute. */
  lazy val schema: profile.SchemaDescription =
    Array(Persons.schema).reduceLeft(_ ++ _)
  @deprecated("Use .schema instead of .ddl", "3.0")
  def ddl = schema

  final class PersonRow(tag: Tag) extends Table[Person](tag, "Campaigns") {

    def id = column[Long]("id", O.PrimaryKey)

    def name = column[String]("name")

    def sex = column[Option[String]]("sex")

    def birthday = column[LocalDate]("birthday")

    def father = column[Option[Long]]("father")

    def mother = column[Option[Long]]("mother")

    def * =
      (
        id,
        name,
        sex,
        birthday,
        father,
        mother
      ).shaped <> ({
        case (id, name, sex, birthday, father, mother) =>
          Person(
            id, name, sex.flatMap(Sex.of), birthday, father, mother
          )
      }, { person: Person =>
        Some(
          person.id, person.name, person.sex.map(_.code), person.birthday, person.father, person.mother
        )
      })
  }

  lazy val Persons = new TableQuery(tag => new PersonRow(tag))

}

准备测试代码

import java.time.LocalDate

import model.Tables.profile.api._
import model.`enum`.Sex
import model.{Person, Tables}
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import slick.jdbc.PostgresProfile.backend.Database

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class IgniteSlickSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers {

  lazy val db = Database.forConfig("ignite.db")

  val namihei = Person(id = 1L, name = "磯野波平", sex = Some(Sex.Male), father = None, mother = None, birthday = LocalDate.of(1895, 9, 14))
  val fune = Person(id = 2L, name = "磯野フネ", sex = Some(Sex.Female), father = None, mother = None, birthday = LocalDate.of(1901, 1, 11))
  val sazae = Person(id = 3L, name = "フグ田サザエ", sex = Some(Sex.Female), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1922, 11, 22))
  val katsuo = Person(id = 4L, name = "磯野カツオ", sex = Some(Sex.Male), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1938, 3, 11))
  val wakame = Person(id = 5L, name = "磯野ワカメ", sex = Some(Sex.Female), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1942, 6, 15))
  val masuo = Person(id = 6L, name = "フグ田マスオ", sex = Some(Sex.Male), father = None, mother = None, birthday = LocalDate.of(1917, 4, 3))
  val tara = Person(id = 7L, name = "フグ田タラオ", sex = Some(Sex.Male), father = Some(6L), mother = Some(3L), birthday = LocalDate.of(1948, 3, 18))
  val isonoFamily = Seq(namihei, fune, sazae, katsuo, wakame, masuo, tara)

  override def beforeAll(): Unit = {
    // init ignite
    Await.result(db.run(Tables.schema.createIfNotExists), Duration.Inf)
    Await.result(db.run(DBIO.sequence(isonoFamily.map(Tables.Persons.insertOrUpdate))), Duration.Inf)
  }

  "磯野家" should {

    "サザエさんの兄弟はカツオとワカメ" in {
      val query = Tables.Persons.filter(x => x.mother === sazae.mother || x.father === sazae.father)
        .filterNot(_.id === sazae.id)
        .sortBy(_.id)
        .map(x => (x.id, x.name)).result
      db.run(query).map(x => assert(x.toSeq === Seq(katsuo, wakame).map(x => (x.id, x.name))))
    }

    "サザエさんの夫はマスオさん" in {
      val action = for {
        children <-  Tables.Persons.filter(_.mother === sazae.id)
        husband <- Tables.Persons.filter(_.id === children.father)
      } yield (husband.id, husband.name)
      val query = action.result
      db.run(query).map(x => assert(x.toSeq === Seq(masuo).map(x => (x.id, x.name))))
    }

  }

}

进行测试!

 sbt test
[info] Loading project definition from /Users/kanako.ohashi/IdeaProjects/ignite-slick/project
[info] Loading settings for project ignite-slick from build.sbt ...
[info] Set current project to ignite-slick (in build file:/Users/kanako.ohashi/IdeaProjects/ignite-slick/)
[info] Compiling 2 Scala sources to /Users/kanako.ohashi/IdeaProjects/ignite-slick/target/scala-2.13/classes ...
[warn] there was one deprecation warning (since 2.13.0); re-run with -deprecation for details
[warn] one warning found
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.ignite.internal.util.GridUnsafe$2 (file:/Users/kanako.ohashi/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/apache/ignite/ignite-core/2.7.6/ignite-core-2.7.6.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.ignite.internal.util.GridUnsafe$2
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[info] IgniteSlickSpec:
[info] 磯野家
[info] - should サザエさんの兄弟はカツオとワカメ
[info] - should サザエさんの夫はマスオさん
[info] Run completed in 1 second, 636 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 6 s, completed 2019/12/19 11:52:43

我成功了。
我能够使用SQL搜索数据。

总结

这次我们成功地将Apache Ignite作为分布式数据库从Slick中使用了出来。有经验使用Slick的用户,将能够降低学习成本,这是不是很好呢?

※我已经将此次验证代码上传至此处。

广告
将在 10 秒后关闭
bannerAds