有关Kafka和Spark Streaming的集成
在进行Web访问分析、实时日志监控和异常检测、社交媒体分析等工作时,人们常常会使用组合了基于开源的分布式流处理平台Apache Kafka和Spark处理流数据的Spark Streaming。本文将以Twitter消息分析为例,介绍在阿里巴巴云的E-MapReduce上,如何集成Kafka和Spark Streaming的方法,希望能够向大家介绍一下。
关于验证环境
实时流处理
-
- EMR-3.20.0
-
- クラスタータイプは Hadoop
-
- ハードウェア構成(Header)はecs.sn2.largeを1台
- ハードウェア構成(Worker)はecs.sn2.largeを2台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
# uname -r
3.10.0-693.2.2.el7.x86_64
# hadoop version
Hadoop 2.8.5
Subversion Unknown -r f6ae72b25f6dbb9925f0850ac865274d31a28e30
Compiled by root on 2019-04-19T06:38Z
Compiled with protoc 2.5.0
From source with checksum 9624fc19bc23f1bbeacb1ae4bee88e7
This command was run using /opt/apps/ecm/service/hadoop/2.8.5-1.3.0/package/hadoop-2.8.5-1.3.0/share/hadoop/common/hadoop-common-2.8.5.jar
# spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.2
/_/
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_151
Branch branch-2.4.2
Compiled by user root on 2019-04-26T06:32:50Z
Revision ef3e6ff4b20dc86ac81291664bb0ed3cf2641fea
Url /root/git-repos/emr-spark.git/
Type --help for more information.
卡夫卡
-
- EMR-3.20.0
-
- Zookeeper 3.4.13
-
- Kakfa 1.1.1
-
- クラスタータイプは Kafka
-
- ハードウェア構成(Header)はecs.sn2.largeを1台
- ハードウェア構成(Worker)はecs.sn2.largeを2台
[root@emr-header-1 bin]# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
蓝鸟客户端
Hosebird Client是一个Java Http客户端,用于与Kafka Producer配合使用,调用Twitter的Streaming API。如果您想了解更详细的信息,请参考下面的GitHub链接。[https://github.com/twitter/hbc:embed:cite]
整体结构图
基于分析的目的,我们希望通过实时推文数来表示在Twitter上指定关键词的热度。首先,构建的阿里巴巴云结构图如下所示。

卡夫卡生产者
现在准备工作已全部就绪,我们立刻开始在本地的Java开发环境中编写代码吧!首先,根据以下示例代码创建Kafka生产者,并生成jar文件。
推文关键词
我指定了四个关键词:“比特币”、“区块链”、“物联网”、“5G”。
Kafka引导服务器
可以使用 Kafka 集群中的任何一台 IP 地址。
推特串流API的认证信息
为了使用Twitter Streaming API,需要事先获取并输入consumerKey、consumerSecret、token、secret。
public class ProducerTest {
Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());
/** ---------------------- Twitter Streaming API情報 ---------------------- */
String consumerKey = "xxxxxxxxxxxxxxxx";
String consumerSecret = "xxxxxxxxxxxxxxxx";
String token = "xxxxxxxxxxxxxxxx";
String secret = "xxxxxxxxxxxxxxxx";
String mytopic = "tweets_poc";
/** ---------------------- Tweetsキーワードを指定 ---------------------- */
List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");
public ProducerTest(){}
public static void main(String[] args) {
new ProducerTest().run();
}
public void run(){
logger.info("Setup");
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
Client client = createTwitterClient(msgQueue);
client.connect();
KafkaProducer<String, String> producer = createKafkaProducer();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping application...");
logger.info("shutting down client from twitter...");
client.stop();
logger.info("closing producer...");
producer.close();
logger.info("done!");
}));
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null){
logger.info(msg);
if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"5G")) {
producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
}
else{
producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
}
}
logger.info("End of application");
}
/** ---------------------- Hosebird Clientを作成 ---------------------- */
public Client createTwitterClient(BlockingQueue<String> msgQueue){
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
hosebirdEndpoint.trackTerms(terms);
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01")
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
return hosebirdClient;
}
/** ---------------------- kakfa producerを作成 ---------------------- */
public KafkaProducer<String, String> createKafkaProducer(){
String bootstrapServers = "xxxxxxxxxxxxxxxx";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
}
实时流消费者
Spark Streaming提供了以微批处理方式重复处理流数据的功能,间隔通常为几秒到几分钟。由于E-Mapreduce的Kafka Broker版本为0.10.0或更高,因此可以直接从每个Partition中读取消息,而无需使用kafka接收器进行与Kafka的集成。本次使用的间隔数据如下所示。
DStream的批处理间隔为1秒。
滑动间隔为1秒。
窗口尺寸300秒。
在本地的Java开发环境中,可以参考以下示例代码创建一个Spark Streaming Consumer并生成jar文件。
public class SparkStreamingPoC {
private static JsonParser jsonParser = new JsonParser();
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("TweetsApp");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));
/** ---------------------- kafkaパラメータ設定 ---------------------- */
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "xxxxxxxxxxxxxxxx");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "tweets_group");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("tweets_poc");
/** ---------------------- Spark Stream作成 ---------------------- */
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
JavaPairDStream<String,Integer> s1 = stream.mapToPair(record -> new Tuple2<>(record.key(),1));
JavaPairDStream<String,Integer> s2 = s1.reduceByKeyAndWindow((a,b) -> a + b,Durations.seconds(300),Durations.seconds(1));
JavaPairDStream<Integer,String> s3 = s2.mapToPair(Tuple2::swap);
JavaPairDStream<Integer,String> s4 = s3.transformToPair(rdd -> rdd.sortByKey(false));
s4.print();
streamingContext.start();
streamingContext.awaitTermination();
}
}
运行Spark应用程序
首先,将创建的Kafka Producer和Spark Streaming Consumer的Jar文件上传到阿里云OSS上。然后,在ECS上通过ssh登录,并使用ossutil等工具将Kafka Producer的jar文件(TweetsProducerTest-1.0-jar-with-dependencies.jar)下载到ECS。下载完成后,使用下面的命令启动Kafka Producer,收集发布到Twitter上的消息。
java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar
您也可以实时从 E-MapReduce 控制台中确认向 Kafka 写入的 Topic 的 Partition、Offset、ISR 等信息。

大师参数
由于E-MapReduce使用Yarn模式,因此请将下图中红框中的CLI参数(master)指定为Yarn。
开放源码软件协议
从OSS下载jar文件并执行。

可以确认ETL处理的结果如下图所示。

最终最后
这个事情你觉得怎么样呢? (Zhè ge nǐ ne?)
在普通情况下,可以通过ssh等方式登录到主节点或者跳板服务器,在命令行中执行作业。但是在本次这样的情况下,可以通过E-MapReduce控制台的数据平台,使用图形界面来操作作业的执行。此外,可以立即反映作业结果的特点也对于作业的故障排除非常有用。
我们在E-Mapreduce上构建了一个使用Spark Streaming和Kafka的流数据处理系统,并介绍了其验证结果。实际上,除了我们介绍的Apache Spark作为通用的实时数据处理平台之外,您还可以选择使用Apache Flink。我们建议已经熟悉Flink的开发人员可以参考阿里云的托管服务Realtime Compute。
此外,除了Twitter消息外,如果您想要实现对日常各种流量日志、邮件等数据的实时分析和监控,也可以考虑使用该系统。请务必尝试使用一下!