有关Kafka和Spark Streaming的集成

在进行Web访问分析、实时日志监控和异常检测、社交媒体分析等工作时,人们常常会使用组合了基于开源的分布式流处理平台Apache Kafka和Spark处理流数据的Spark Streaming。本文将以Twitter消息分析为例,介绍在阿里巴巴云的E-MapReduce上,如何集成Kafka和Spark Streaming的方法,希望能够向大家介绍一下。

关于验证环境

实时流处理
    • EMR-3.20.0

 

    • クラスタータイプは Hadoop

 

    • ハードウェア構成(Header)はecs.sn2.largeを1台

 

    ハードウェア構成(Worker)はecs.sn2.largeを2台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# hadoop version
Hadoop 2.8.5
Subversion Unknown -r f6ae72b25f6dbb9925f0850ac865274d31a28e30
Compiled by root on 2019-04-19T06:38Z
Compiled with protoc 2.5.0
From source with checksum 9624fc19bc23f1bbeacb1ae4bee88e7
This command was run using /opt/apps/ecm/service/hadoop/2.8.5-1.3.0/package/hadoop-2.8.5-1.3.0/share/hadoop/common/hadoop-common-2.8.5.jar
# spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_151
Branch branch-2.4.2
Compiled by user root on 2019-04-26T06:32:50Z
Revision ef3e6ff4b20dc86ac81291664bb0ed3cf2641fea
Url /root/git-repos/emr-spark.git/
Type --help for more information.

卡夫卡
    • EMR-3.20.0

 

    • Zookeeper 3.4.13

 

    • Kakfa 1.1.1

 

    • クラスタータイプは Kafka

 

    • ハードウェア構成(Header)はecs.sn2.largeを1台

 

    ハードウェア構成(Worker)はecs.sn2.largeを2台
[root@emr-header-1 bin]# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
蓝鸟客户端

Hosebird Client是一个Java Http客户端,用于与Kafka Producer配合使用,调用Twitter的Streaming API。如果您想了解更详细的信息,请参考下面的GitHub链接。[https://github.com/twitter/hbc:embed:cite]

整体结构图

基于分析的目的,我们希望通过实时推文数来表示在Twitter上指定关键词的热度。首先,构建的阿里巴巴云结构图如下所示。

f:id:sbc_kou:20190716161135p:plain

卡夫卡生产者

现在准备工作已全部就绪,我们立刻开始在本地的Java开发环境中编写代码吧!首先,根据以下示例代码创建Kafka生产者,并生成jar文件。

推文关键词

我指定了四个关键词:“比特币”、“区块链”、“物联网”、“5G”。

Kafka引导服务器

可以使用 Kafka 集群中的任何一台 IP 地址。

推特串流API的认证信息

为了使用Twitter Streaming API,需要事先获取并输入consumerKey、consumerSecret、token、secret。


public class ProducerTest {

    Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());

 /** ---------------------- Twitter Streaming API情報 ---------------------- */
    String consumerKey = "xxxxxxxxxxxxxxxx";
    String consumerSecret = "xxxxxxxxxxxxxxxx";
    String token = "xxxxxxxxxxxxxxxx";
    String secret = "xxxxxxxxxxxxxxxx";
    String mytopic = "tweets_poc";

 /** ---------------------- Tweetsキーワードを指定 ---------------------- */
    List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");


    public ProducerTest(){}

    public static void main(String[] args) {
        new ProducerTest().run();
    }

    public void run(){

        logger.info("Setup");

        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

        Client client = createTwitterClient(msgQueue);
        client.connect();

        KafkaProducer<String, String> producer = createKafkaProducer();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            logger.info("shutting down client from twitter...");
            client.stop();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));

        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                client.stop();
            }

            if (msg != null){
                logger.info(msg);
                if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
                    producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
                    producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
                    producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"5G")) {
                    producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
                }
                else{
                    producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
            }
        }
        logger.info("End of application");
    }

 /** ---------------------- Hosebird Clientを作成 ---------------------- */
    public Client createTwitterClient(BlockingQueue<String> msgQueue){

        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        hosebirdEndpoint.trackTerms(terms);

        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")                              
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        return hosebirdClient;
    }

  /** ---------------------- kakfa producerを作成 ---------------------- */
    public KafkaProducer<String, String> createKafkaProducer(){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 

        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        return producer;
    }

}

实时流消费者

Spark Streaming提供了以微批处理方式重复处理流数据的功能,间隔通常为几秒到几分钟。由于E-Mapreduce的Kafka Broker版本为0.10.0或更高,因此可以直接从每个Partition中读取消息,而无需使用kafka接收器进行与Kafka的集成。本次使用的间隔数据如下所示。

DStream的批处理间隔为1秒。
滑动间隔为1秒。
窗口尺寸300秒。

在本地的Java开发环境中,可以参考以下示例代码创建一个Spark Streaming Consumer并生成jar文件。


public class SparkStreamingPoC {

    private static JsonParser jsonParser = new JsonParser();

    public static void main(String[] args) throws InterruptedException {

        SparkConf sparkConf = new SparkConf().setAppName("TweetsApp");

        JavaStreamingContext streamingContext = new JavaStreamingContext(
                sparkConf, Durations.seconds(1));

         /** ---------------------- kafkaパラメータ設定 ---------------------- */
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "xxxxxxxxxxxxxxxx");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "tweets_group");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("tweets_poc");


        /** ---------------------- Spark Stream作成 ---------------------- */
       JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

        JavaPairDStream<String,Integer> s1 = stream.mapToPair(record -> new Tuple2<>(record.key(),1));
        JavaPairDStream<String,Integer> s2 = s1.reduceByKeyAndWindow((a,b) -> a + b,Durations.seconds(300),Durations.seconds(1));
        JavaPairDStream<Integer,String> s3 = s2.mapToPair(Tuple2::swap);
        JavaPairDStream<Integer,String> s4 = s3.transformToPair(rdd -> rdd.sortByKey(false));
        s4.print();

        streamingContext.start();
        streamingContext.awaitTermination();

    }
}

运行Spark应用程序

首先,将创建的Kafka Producer和Spark Streaming Consumer的Jar文件上传到阿里云OSS上。然后,在ECS上通过ssh登录,并使用ossutil等工具将Kafka Producer的jar文件(TweetsProducerTest-1.0-jar-with-dependencies.jar)下载到ECS。下载完成后,使用下面的命令启动Kafka Producer,收集发布到Twitter上的消息。

java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar

您也可以实时从 E-MapReduce 控制台中确认向 Kafka 写入的 Topic 的 Partition、Offset、ISR 等信息。

f:id:sbc_kou:20190703171507p:plain
大师参数

由于E-MapReduce使用Yarn模式,因此请将下图中红框中的CLI参数(master)指定为Yarn。

开放源码软件协议

从OSS下载jar文件并执行。

f:id:sbc_kou:20190703164615p:plain

可以确认ETL处理的结果如下图所示。

f:id:sbc_kou:20190703172435p:plain

最终最后

这个事情你觉得怎么样呢? (Zhè ge nǐ ne?)

在普通情况下,可以通过ssh等方式登录到主节点或者跳板服务器,在命令行中执行作业。但是在本次这样的情况下,可以通过E-MapReduce控制台的数据平台,使用图形界面来操作作业的执行。此外,可以立即反映作业结果的特点也对于作业的故障排除非常有用。

我们在E-Mapreduce上构建了一个使用Spark Streaming和Kafka的流数据处理系统,并介绍了其验证结果。实际上,除了我们介绍的Apache Spark作为通用的实时数据处理平台之外,您还可以选择使用Apache Flink。我们建议已经熟悉Flink的开发人员可以参考阿里云的托管服务Realtime Compute。
此外,除了Twitter消息外,如果您想要实现对日常各种流量日志、邮件等数据的实时分析和监控,也可以考虑使用该系统。请务必尝试使用一下!

bannerAds