{"id":27486,"date":"2024-03-16T08:33:22","date_gmt":"2024-03-16T08:33:22","guid":{"rendered":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/"},"modified":"2024-03-22T11:09:16","modified_gmt":"2024-03-22T11:09:16","slug":"how-does-spark-read-data-from-kafka","status":"publish","type":"post","link":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/","title":{"rendered":"How does Spark read data from Kafka?"},"content":{"rendered":"<p>To read data from Kafka in Spark, you can use Spark&#8217;s official Kafka integration library, known as Spark Streaming Kafka.<\/p>\n<p>First, you need to add the Spark Streaming Kafka dependency to your Spark project. In a Maven project, you can add the following dependency to the pom.xml file:<\/p>\n<pre class=\"post-pre\"><code><span class=\"hljs-tag\">&lt;<span class=\"hljs-name\">dependency<\/span>&gt;<\/span>\r\n    <span class=\"hljs-tag\">&lt;<span class=\"hljs-name\">groupId<\/span>&gt;<\/span>org.apache.spark<span class=\"hljs-tag\">&lt;\/<span class=\"hljs-name\">groupId<\/span>&gt;<\/span>\r\n    <span class=\"hljs-tag\">&lt;<span class=\"hljs-name\">artifactId<\/span>&gt;<\/span>spark-streaming-kafka-0-10_2.12<span class=\"hljs-tag\">&lt;\/<span class=\"hljs-name\">artifactId<\/span>&gt;<\/span>\r\n    <span class=\"hljs-tag\">&lt;<span class=\"hljs-name\">version<\/span>&gt;<\/span>3.0.2<span class=\"hljs-tag\">&lt;\/<span class=\"hljs-name\">version<\/span>&gt;<\/span>\r\n<span class=\"hljs-tag\">&lt;\/<span class=\"hljs-name\">dependency<\/span>&gt;<\/span>\r\n<\/code><\/pre>\n<p>Next, you can create a StreamingContext using the SparkSession object and specify the batch processing interval.<\/p>\n<pre class=\"post-pre\"><code><span class=\"hljs-keyword\">import<\/span> org.apache.spark.SparkConf;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.spark.streaming.Durations;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.spark.streaming.api.java.JavaInputDStream;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.spark.streaming.api.java.JavaStreamingContext;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.spark.streaming.kafka010.KafkaUtils;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.kafka.common.serialization.StringDeserializer;\r\n\r\n<span class=\"hljs-keyword\">public<\/span> <span class=\"hljs-keyword\">class<\/span> <span class=\"hljs-title class_\">KafkaStreamingExample<\/span> {\r\n    <span class=\"hljs-keyword\">public<\/span> <span class=\"hljs-keyword\">static<\/span> <span class=\"hljs-keyword\">void<\/span> <span class=\"hljs-title function_\">main<\/span><span class=\"hljs-params\">(String[] args)<\/span> <span class=\"hljs-keyword\">throws<\/span> InterruptedException {\r\n        <span class=\"hljs-comment\">\/\/ \u521b\u5efaSparkConf\u5bf9\u8c61<\/span>\r\n        <span class=\"hljs-type\">SparkConf<\/span> <span class=\"hljs-variable\">sparkConf<\/span> <span class=\"hljs-operator\">=<\/span> <span class=\"hljs-keyword\">new<\/span> <span class=\"hljs-title class_\">SparkConf<\/span>().setAppName(<span class=\"hljs-string\">\"KafkaStreamingExample\"<\/span>).setMaster(<span class=\"hljs-string\">\"local[*]\"<\/span>);\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u521b\u5efaJavaStreamingContext\u5bf9\u8c61<\/span>\r\n        <span class=\"hljs-type\">JavaStreamingContext<\/span> <span class=\"hljs-variable\">streamingContext<\/span> <span class=\"hljs-operator\">=<\/span> <span class=\"hljs-keyword\">new<\/span> <span class=\"hljs-title class_\">JavaStreamingContext<\/span>(sparkConf, Durations.seconds(<span class=\"hljs-number\">1<\/span>));\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u8bbe\u7f6eKafka\u53c2\u6570<\/span>\r\n        Map&lt;String, Object&gt; kafkaParams = <span class=\"hljs-keyword\">new<\/span> <span class=\"hljs-title class_\">HashMap<\/span>&lt;&gt;();\r\n        kafkaParams.put(<span class=\"hljs-string\">\"bootstrap.servers\"<\/span>, <span class=\"hljs-string\">\"localhost:9092\"<\/span>);\r\n        kafkaParams.put(<span class=\"hljs-string\">\"key.deserializer\"<\/span>, StringDeserializer.class);\r\n        kafkaParams.put(<span class=\"hljs-string\">\"value.deserializer\"<\/span>, StringDeserializer.class);\r\n        kafkaParams.put(<span class=\"hljs-string\">\"group.id\"<\/span>, <span class=\"hljs-string\">\"test-group\"<\/span>);\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u521b\u5efaKafka\u4e3b\u9898\u5217\u8868<\/span>\r\n        Collection&lt;String&gt; topics = Arrays.asList(<span class=\"hljs-string\">\"topic1\"<\/span>, <span class=\"hljs-string\">\"topic2\"<\/span>);\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u521b\u5efaKafka\u8f93\u5165\u6d41<\/span>\r\n        JavaInputDStream&lt;ConsumerRecord&lt;String, String&gt;&gt; kafkaStream = KafkaUtils.createDirectStream(\r\n                streamingContext,\r\n                LocationStrategies.PreferConsistent(),\r\n                ConsumerStrategies.&lt;String, String&gt;Subscribe(topics, kafkaParams)\r\n        );\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u5904\u7406Kafka\u6570\u636e<\/span>\r\n        kafkaStream.foreachRDD(rdd -&gt; {\r\n            <span class=\"hljs-comment\">\/\/ \u5728\u8fd9\u91cc\u5bf9\u6bcf\u4e2aRDD\u8fdb\u884c\u5904\u7406<\/span>\r\n            rdd.foreach(record -&gt; {\r\n                System.out.println(<span class=\"hljs-string\">\"Key: \"<\/span> + record.key() + <span class=\"hljs-string\">\", Value: \"<\/span> + record.value());\r\n            });\r\n        });\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u542f\u52a8\u6d41\u5904\u7406\u7a0b\u5e8f<\/span>\r\n        streamingContext.start();\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u7b49\u5f85\u6d41\u5904\u7406\u7a0b\u5e8f\u7ec8\u6b62<\/span>\r\n        streamingContext.awaitTermination();\r\n    }\r\n}\r\n<\/code><\/pre>\n<p>In the example above, we first create a SparkConf object and a JavaStreamingContext object. Then, we set the parameters for Kafka, including the Kafka server address, key and value deserialization classes, and consumer group ID. Next, we create a Kafka input stream and specify the topic to subscribe to and Kafka parameters. Finally, we use the foreachRDD method to process each RDD and extract the key and value for each record.<\/p>\n<p>Please note that the createDirectStream method in the above example is suitable for Kafka version 0.10 and higher. If you are using an older version of Kafka, you can use another overloaded version of the createDirectStream method. Additionally, you can adjust other parameters and processing logic in the example as needed.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>To read data from Kafka in Spark, you can use Spark&#8217;s official Kafka integration library, known as Spark Streaming Kafka. First, you need to add the Spark Streaming Kafka dependency to your Spark project. In a Maven project, you can add the following dependency to the pom.xml file: &lt;dependency&gt; &lt;groupId&gt;org.apache.spark&lt;\/groupId&gt; &lt;artifactId&gt;spark-streaming-kafka-0-10_2.12&lt;\/artifactId&gt; &lt;version&gt;3.0.2&lt;\/version&gt; &lt;\/dependency&gt; Next, you [&hellip;]<\/p>\n","protected":false},"author":6,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_import_markdown_pro_load_document_selector":0,"_import_markdown_pro_submit_text_textarea":"","footnotes":""},"categories":[1],"tags":[],"class_list":["post-27486","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"yoast_head":"<!-- This site is optimized with the Yoast SEO Premium plugin v21.5 (Yoast SEO v21.5) - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>How does Spark read data from Kafka? - Blog - Silicon Cloud<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"How does Spark read data from Kafka?\" \/>\n<meta property=\"og:description\" content=\"To read data from Kafka in Spark, you can use Spark&#8217;s official Kafka integration library, known as Spark Streaming Kafka. First, you need to add the Spark Streaming Kafka dependency to your Spark project. In a Maven project, you can add the following dependency to the pom.xml file: &lt;dependency&gt; &lt;groupId&gt;org.apache.spark&lt;\/groupId&gt; &lt;artifactId&gt;spark-streaming-kafka-0-10_2.12&lt;\/artifactId&gt; &lt;version&gt;3.0.2&lt;\/version&gt; &lt;\/dependency&gt; Next, you [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\" \/>\n<meta property=\"og:site_name\" content=\"Blog - Silicon Cloud\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/SiliCloudGlobal\/\" \/>\n<meta property=\"article:published_time\" content=\"2024-03-16T08:33:22+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2024-03-22T11:09:16+00:00\" \/>\n<meta name=\"author\" content=\"Benjamin Taylor\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@SiliCloudGlobal\" \/>\n<meta name=\"twitter:site\" content=\"@SiliCloudGlobal\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"Benjamin Taylor\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"2 minutes\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\"},\"author\":{\"name\":\"Benjamin Taylor\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/ac801fe9549a25960ce48aa2e0a691c9\"},\"headline\":\"How does Spark read data from Kafka?\",\"datePublished\":\"2024-03-16T08:33:22+00:00\",\"dateModified\":\"2024-03-22T11:09:16+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\"},\"wordCount\":194,\"commentCount\":0,\"publisher\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#organization\"},\"inLanguage\":\"en-US\"},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\",\"url\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\",\"name\":\"How does Spark read data from Kafka? - Blog - Silicon Cloud\",\"isPartOf\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#website\"},\"datePublished\":\"2024-03-16T08:33:22+00:00\",\"dateModified\":\"2024-03-22T11:09:16+00:00\",\"breadcrumb\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/#breadcrumb\"},\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/\"]}]},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\/\/www.silicloud.com\/blog\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"How does Spark read data from Kafka?\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#website\",\"url\":\"https:\/\/www.silicloud.com\/blog\/\",\"name\":\"Silicon Cloud Blog\",\"description\":\"\",\"publisher\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#organization\"},\"inLanguage\":\"en-US\"},{\"@type\":\"Organization\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#organization\",\"name\":\"Silicon Cloud Blog\",\"url\":\"https:\/\/www.silicloud.com\/blog\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/\",\"url\":\"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png\",\"contentUrl\":\"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png\",\"width\":1024,\"height\":1024,\"caption\":\"Silicon Cloud Blog\"},\"image\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/\"},\"sameAs\":[\"https:\/\/www.facebook.com\/SiliCloudGlobal\/\",\"https:\/\/twitter.com\/SiliCloudGlobal\"]},{\"@type\":\"Person\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/ac801fe9549a25960ce48aa2e0a691c9\",\"name\":\"Benjamin Taylor\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/ec2e3d3e2d525fd148047c4520ae7c1cdccd1f4b48a1a488422b31f04f345c14?s=96&d=mm&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/ec2e3d3e2d525fd148047c4520ae7c1cdccd1f4b48a1a488422b31f04f345c14?s=96&d=mm&r=g\",\"caption\":\"Benjamin Taylor\"},\"url\":\"https:\/\/www.silicloud.com\/blog\/author\/benjamintaylor\/\"}]}<\/script>\n<!-- \/ Yoast SEO Premium plugin. -->","yoast_head_json":{"title":"How does Spark read data from Kafka? - Blog - Silicon Cloud","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/","og_locale":"en_US","og_type":"article","og_title":"How does Spark read data from Kafka?","og_description":"To read data from Kafka in Spark, you can use Spark&#8217;s official Kafka integration library, known as Spark Streaming Kafka. First, you need to add the Spark Streaming Kafka dependency to your Spark project. In a Maven project, you can add the following dependency to the pom.xml file: &lt;dependency&gt; &lt;groupId&gt;org.apache.spark&lt;\/groupId&gt; &lt;artifactId&gt;spark-streaming-kafka-0-10_2.12&lt;\/artifactId&gt; &lt;version&gt;3.0.2&lt;\/version&gt; &lt;\/dependency&gt; Next, you [&hellip;]","og_url":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/","og_site_name":"Blog - Silicon Cloud","article_publisher":"https:\/\/www.facebook.com\/SiliCloudGlobal\/","article_published_time":"2024-03-16T08:33:22+00:00","article_modified_time":"2024-03-22T11:09:16+00:00","author":"Benjamin Taylor","twitter_card":"summary_large_image","twitter_creator":"@SiliCloudGlobal","twitter_site":"@SiliCloudGlobal","twitter_misc":{"Written by":"Benjamin Taylor","Est. reading time":"2 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/#article","isPartOf":{"@id":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/"},"author":{"name":"Benjamin Taylor","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/ac801fe9549a25960ce48aa2e0a691c9"},"headline":"How does Spark read data from Kafka?","datePublished":"2024-03-16T08:33:22+00:00","dateModified":"2024-03-22T11:09:16+00:00","mainEntityOfPage":{"@id":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/"},"wordCount":194,"commentCount":0,"publisher":{"@id":"https:\/\/www.silicloud.com\/blog\/#organization"},"inLanguage":"en-US"},{"@type":"WebPage","@id":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/","url":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/","name":"How does Spark read data from Kafka? - Blog - Silicon Cloud","isPartOf":{"@id":"https:\/\/www.silicloud.com\/blog\/#website"},"datePublished":"2024-03-16T08:33:22+00:00","dateModified":"2024-03-22T11:09:16+00:00","breadcrumb":{"@id":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/"]}]},{"@type":"BreadcrumbList","@id":"https:\/\/www.silicloud.com\/blog\/how-does-spark-read-data-from-kafka\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.silicloud.com\/blog\/"},{"@type":"ListItem","position":2,"name":"How does Spark read data from Kafka?"}]},{"@type":"WebSite","@id":"https:\/\/www.silicloud.com\/blog\/#website","url":"https:\/\/www.silicloud.com\/blog\/","name":"Silicon Cloud Blog","description":"","publisher":{"@id":"https:\/\/www.silicloud.com\/blog\/#organization"},"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/www.silicloud.com\/blog\/#organization","name":"Silicon Cloud Blog","url":"https:\/\/www.silicloud.com\/blog\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/","url":"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png","contentUrl":"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png","width":1024,"height":1024,"caption":"Silicon Cloud Blog"},"image":{"@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/SiliCloudGlobal\/","https:\/\/twitter.com\/SiliCloudGlobal"]},{"@type":"Person","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/ac801fe9549a25960ce48aa2e0a691c9","name":"Benjamin Taylor","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/image\/","url":"https:\/\/secure.gravatar.com\/avatar\/ec2e3d3e2d525fd148047c4520ae7c1cdccd1f4b48a1a488422b31f04f345c14?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/ec2e3d3e2d525fd148047c4520ae7c1cdccd1f4b48a1a488422b31f04f345c14?s=96&d=mm&r=g","caption":"Benjamin Taylor"},"url":"https:\/\/www.silicloud.com\/blog\/author\/benjamintaylor\/"}]}},"_links":{"self":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts\/27486","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/users\/6"}],"replies":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/comments?post=27486"}],"version-history":[{"count":1,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts\/27486\/revisions"}],"predecessor-version":[{"id":61720,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts\/27486\/revisions\/61720"}],"wp:attachment":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/media?parent=27486"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/categories?post=27486"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/tags?post=27486"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}