{"id":19789,"date":"2024-03-15T19:27:38","date_gmt":"2024-03-15T19:27:38","guid":{"rendered":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/"},"modified":"2024-03-21T16:32:25","modified_gmt":"2024-03-21T16:32:25","slug":"how-to-customize-the-output-of-flink-data-to-kafka","status":"publish","type":"post","link":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/","title":{"rendered":"How to customize the output of Flink data to Kafka?"},"content":{"rendered":"<p>In Flink, data can be custom output to Kafka using the addSink() method. Here is an example code:<\/p>\n<pre class=\"post-pre\"><code><span class=\"hljs-keyword\">import<\/span> org.apache.flink.api.common.serialization.SimpleStringSchema;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.flink.streaming.api.datastream.DataStream;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;\r\n<span class=\"hljs-keyword\">import<\/span> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;\r\n\r\n<span class=\"hljs-keyword\">public<\/span> <span class=\"hljs-keyword\">class<\/span> <span class=\"hljs-title class_\">FlinkKafkaOutputExample<\/span> {\r\n\r\n    <span class=\"hljs-keyword\">public<\/span> <span class=\"hljs-keyword\">static<\/span> <span class=\"hljs-keyword\">void<\/span> <span class=\"hljs-title function_\">main<\/span><span class=\"hljs-params\">(String[] args)<\/span> <span class=\"hljs-keyword\">throws<\/span> Exception {\r\n        <span class=\"hljs-comment\">\/\/ \u8bbe\u7f6e\u6267\u884c\u73af\u5883<\/span>\r\n        <span class=\"hljs-type\">StreamExecutionEnvironment<\/span> <span class=\"hljs-variable\">env<\/span> <span class=\"hljs-operator\">=<\/span> StreamExecutionEnvironment.getExecutionEnvironment();\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u521b\u5efa\u8f93\u5165\u6570\u636e\u6d41<\/span>\r\n        DataStream&lt;String&gt; inputStream = env.fromElements(<span class=\"hljs-string\">\"data1\"<\/span>, <span class=\"hljs-string\">\"data2\"<\/span>, <span class=\"hljs-string\">\"data3\"<\/span>);\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u5b9a\u4e49Kafka\u8fde\u63a5\u4fe1\u606f<\/span>\r\n        <span class=\"hljs-type\">String<\/span> <span class=\"hljs-variable\">kafkaBroker<\/span> <span class=\"hljs-operator\">=<\/span> <span class=\"hljs-string\">\"localhost:9092\"<\/span>;\r\n        <span class=\"hljs-type\">String<\/span> <span class=\"hljs-variable\">kafkaTopic<\/span> <span class=\"hljs-operator\">=<\/span> <span class=\"hljs-string\">\"output_topic\"<\/span>;\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u521b\u5efaKafka\u751f\u4ea7\u8005<\/span>\r\n        FlinkKafkaProducer&lt;String&gt; kafkaSink = <span class=\"hljs-keyword\">new<\/span> <span class=\"hljs-title class_\">FlinkKafkaProducer<\/span>&lt;&gt;(\r\n                kafkaTopic,\r\n                <span class=\"hljs-keyword\">new<\/span> <span class=\"hljs-title class_\">SimpleStringSchema<\/span>(),\r\n                KafkaConfig.getProperties(kafkaBroker),\r\n                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u5c06\u6570\u636e\u6d41\u5199\u5165Kafka<\/span>\r\n        inputStream.addSink(kafkaSink);\r\n\r\n        <span class=\"hljs-comment\">\/\/ \u6267\u884c\u4efb\u52a1<\/span>\r\n        env.execute(<span class=\"hljs-string\">\"Flink Kafka Output Example\"<\/span>);\r\n    }\r\n}\r\n<\/code><\/pre>\n<p>In the code above, we first obtain a StreamExecutionEnvironment object using StreamExecutionEnvironment.getExecutionEnvironment(), and then create an input data stream using the fromElements() method. Next, we define the connection information for Kafka, including the broker address and the output topic name. Then, we create a Kafka producer instance using FlinkKafkaProducer, where we set the data serialization method and Kafka configuration information. Finally, we write the data to Kafka using the addSink() method.<\/p>\n<p>It is important to note that the old version of the Kafka connector used in the example above has been deprecated in the latest version of Flink. If you are using the new version of Flink, you can replace the KafkaConfig.getProperties(kafkaBroker) used in the example above with the method of using the constructor of FlinkKafkaProducer to accept a KafkaProducer configuration object.<\/p>\n<p>Moreover, customizing the serialization of data can be achieved by implementing the custom SerializationSchema interface, as well as customizing the partition of data by implementing the KafkaSerializationSchema interface, and so on. For specific details, please refer to the official Flink documentation.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In Flink, data can be custom output to Kafka using the addSink() method. Here is an example code: import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class FlinkKafkaOutputExample { public static void main(String[] args) throws Exception { \/\/ \u8bbe\u7f6e\u6267\u884c\u73af\u5883 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); \/\/ \u521b\u5efa\u8f93\u5165\u6570\u636e\u6d41 DataStream&lt;String&gt; inputStream = env.fromElements(&#8220;data1&#8221;, &#8220;data2&#8221;, &#8220;data3&#8221;); \/\/ \u5b9a\u4e49Kafka\u8fde\u63a5\u4fe1\u606f String [&hellip;]<\/p>\n","protected":false},"author":8,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_import_markdown_pro_load_document_selector":0,"_import_markdown_pro_submit_text_textarea":"","footnotes":""},"categories":[1],"tags":[],"class_list":["post-19789","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"yoast_head":"<!-- This site is optimized with the Yoast SEO Premium plugin v21.5 (Yoast SEO v21.5) - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>How to customize the output of Flink data to Kafka? - Blog - Silicon Cloud<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"How to customize the output of Flink data to Kafka?\" \/>\n<meta property=\"og:description\" content=\"In Flink, data can be custom output to Kafka using the addSink() method. Here is an example code: import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class FlinkKafkaOutputExample { public static void main(String[] args) throws Exception { \/\/ \u8bbe\u7f6e\u6267\u884c\u73af\u5883 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); \/\/ \u521b\u5efa\u8f93\u5165\u6570\u636e\u6d41 DataStream&lt;String&gt; inputStream = env.fromElements(&quot;data1&quot;, &quot;data2&quot;, &quot;data3&quot;); \/\/ \u5b9a\u4e49Kafka\u8fde\u63a5\u4fe1\u606f String [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\" \/>\n<meta property=\"og:site_name\" content=\"Blog - Silicon Cloud\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/SiliCloudGlobal\/\" \/>\n<meta property=\"article:published_time\" content=\"2024-03-15T19:27:38+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2024-03-21T16:32:25+00:00\" \/>\n<meta name=\"author\" content=\"William Carter\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@SiliCloudGlobal\" \/>\n<meta name=\"twitter:site\" content=\"@SiliCloudGlobal\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"William Carter\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"1 minute\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\"},\"author\":{\"name\":\"William Carter\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/f697031891aacefc4b681d139781d3c0\"},\"headline\":\"How to customize the output of Flink data to Kafka?\",\"datePublished\":\"2024-03-15T19:27:38+00:00\",\"dateModified\":\"2024-03-21T16:32:25+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\"},\"wordCount\":204,\"commentCount\":0,\"publisher\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#organization\"},\"inLanguage\":\"en-US\"},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\",\"url\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\",\"name\":\"How to customize the output of Flink data to Kafka? - Blog - Silicon Cloud\",\"isPartOf\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#website\"},\"datePublished\":\"2024-03-15T19:27:38+00:00\",\"dateModified\":\"2024-03-21T16:32:25+00:00\",\"breadcrumb\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/#breadcrumb\"},\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/\"]}]},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\/\/www.silicloud.com\/blog\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"How to customize the output of Flink data to Kafka?\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#website\",\"url\":\"https:\/\/www.silicloud.com\/blog\/\",\"name\":\"Silicon Cloud Blog\",\"description\":\"\",\"publisher\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#organization\"},\"inLanguage\":\"en-US\"},{\"@type\":\"Organization\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#organization\",\"name\":\"Silicon Cloud Blog\",\"url\":\"https:\/\/www.silicloud.com\/blog\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/\",\"url\":\"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png\",\"contentUrl\":\"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png\",\"width\":1024,\"height\":1024,\"caption\":\"Silicon Cloud Blog\"},\"image\":{\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/\"},\"sameAs\":[\"https:\/\/www.facebook.com\/SiliCloudGlobal\/\",\"https:\/\/twitter.com\/SiliCloudGlobal\"]},{\"@type\":\"Person\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/f697031891aacefc4b681d139781d3c0\",\"name\":\"William Carter\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/1786698071dd8d74bec894b512f9e3c610c3a2a32985f67e688976cee3c8bbef?s=96&d=mm&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/1786698071dd8d74bec894b512f9e3c610c3a2a32985f67e688976cee3c8bbef?s=96&d=mm&r=g\",\"caption\":\"William Carter\"},\"url\":\"https:\/\/www.silicloud.com\/blog\/author\/williamcarter\/\"}]}<\/script>\n<!-- \/ Yoast SEO Premium plugin. -->","yoast_head_json":{"title":"How to customize the output of Flink data to Kafka? - Blog - Silicon Cloud","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/","og_locale":"en_US","og_type":"article","og_title":"How to customize the output of Flink data to Kafka?","og_description":"In Flink, data can be custom output to Kafka using the addSink() method. Here is an example code: import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class FlinkKafkaOutputExample { public static void main(String[] args) throws Exception { \/\/ \u8bbe\u7f6e\u6267\u884c\u73af\u5883 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); \/\/ \u521b\u5efa\u8f93\u5165\u6570\u636e\u6d41 DataStream&lt;String&gt; inputStream = env.fromElements(\"data1\", \"data2\", \"data3\"); \/\/ \u5b9a\u4e49Kafka\u8fde\u63a5\u4fe1\u606f String [&hellip;]","og_url":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/","og_site_name":"Blog - Silicon Cloud","article_publisher":"https:\/\/www.facebook.com\/SiliCloudGlobal\/","article_published_time":"2024-03-15T19:27:38+00:00","article_modified_time":"2024-03-21T16:32:25+00:00","author":"William Carter","twitter_card":"summary_large_image","twitter_creator":"@SiliCloudGlobal","twitter_site":"@SiliCloudGlobal","twitter_misc":{"Written by":"William Carter","Est. reading time":"1 minute"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/#article","isPartOf":{"@id":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/"},"author":{"name":"William Carter","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/f697031891aacefc4b681d139781d3c0"},"headline":"How to customize the output of Flink data to Kafka?","datePublished":"2024-03-15T19:27:38+00:00","dateModified":"2024-03-21T16:32:25+00:00","mainEntityOfPage":{"@id":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/"},"wordCount":204,"commentCount":0,"publisher":{"@id":"https:\/\/www.silicloud.com\/blog\/#organization"},"inLanguage":"en-US"},{"@type":"WebPage","@id":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/","url":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/","name":"How to customize the output of Flink data to Kafka? - Blog - Silicon Cloud","isPartOf":{"@id":"https:\/\/www.silicloud.com\/blog\/#website"},"datePublished":"2024-03-15T19:27:38+00:00","dateModified":"2024-03-21T16:32:25+00:00","breadcrumb":{"@id":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/"]}]},{"@type":"BreadcrumbList","@id":"https:\/\/www.silicloud.com\/blog\/how-to-customize-the-output-of-flink-data-to-kafka\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.silicloud.com\/blog\/"},{"@type":"ListItem","position":2,"name":"How to customize the output of Flink data to Kafka?"}]},{"@type":"WebSite","@id":"https:\/\/www.silicloud.com\/blog\/#website","url":"https:\/\/www.silicloud.com\/blog\/","name":"Silicon Cloud Blog","description":"","publisher":{"@id":"https:\/\/www.silicloud.com\/blog\/#organization"},"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/www.silicloud.com\/blog\/#organization","name":"Silicon Cloud Blog","url":"https:\/\/www.silicloud.com\/blog\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/","url":"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png","contentUrl":"https:\/\/www.silicloud.com\/blog\/wp-content\/uploads\/2023\/11\/EN-SILICON-Full.png","width":1024,"height":1024,"caption":"Silicon Cloud Blog"},"image":{"@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/SiliCloudGlobal\/","https:\/\/twitter.com\/SiliCloudGlobal"]},{"@type":"Person","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/f697031891aacefc4b681d139781d3c0","name":"William Carter","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.silicloud.com\/blog\/#\/schema\/person\/image\/","url":"https:\/\/secure.gravatar.com\/avatar\/1786698071dd8d74bec894b512f9e3c610c3a2a32985f67e688976cee3c8bbef?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/1786698071dd8d74bec894b512f9e3c610c3a2a32985f67e688976cee3c8bbef?s=96&d=mm&r=g","caption":"William Carter"},"url":"https:\/\/www.silicloud.com\/blog\/author\/williamcarter\/"}]}},"_links":{"self":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts\/19789","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/users\/8"}],"replies":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/comments?post=19789"}],"version-history":[{"count":1,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts\/19789\/revisions"}],"predecessor-version":[{"id":53549,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/posts\/19789\/revisions\/53549"}],"wp:attachment":[{"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/media?parent=19789"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/categories?post=19789"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.silicloud.com\/blog\/wp-json\/wp\/v2\/tags?post=19789"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}