{"id":46945,"date":"2023-09-29T11:58:37","date_gmt":"2023-02-02T04:04:31","guid":{"rendered":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/"},"modified":"2024-05-04T02:03:36","modified_gmt":"2024-05-03T18:03:36","slug":"%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82","status":"publish","type":"post","link":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/","title":{"rendered":"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408"},"content":{"rendered":"<p>\u6211\u5011\u5728Jupyter\u7684\u904b\u884c\u74b0\u5883\u4e2d\u4f7f\u7528PySpark Streaming\u4f86\u8a66\u9a57SensorTag\u7684\u6578\u64da\u9032\u884c\u7a97\u53e3\u805a\u5408\u3002\u5118\u7ba1\u6709\u5e7e\u500b\u5176\u4ed6\u7684\u6d41\u8655\u7406\u6846\u67b6\u53ef\u4f9b\u9078\u64c7\uff0c\u4f46\u4e0b\u4e00\u6b65\u6211\u5011\u5c07\u5617\u8a66\u4f7f\u7528Kafka Streams\u3002\u8207Spark\u4e0d\u540c\u7684\u662f\uff0cKafka Streams\u4e0d\u662f\u4e00\u500b\u96c6\u7fa4\uff0c\u800c\u662f\u4e00\u500b\u5eab\u3002\u76ee\u524d\uff0c\u5b98\u65b9\u53ea\u652f\u6301Java\u4f5c\u70ba\u958b\u767c\u8a9e\u8a00\u3002<\/p>\n<h2>Java\u73af\u5883<\/h2>\n<p>\u6211\u6253\u7b97\u4f7f\u7528Maven\u5728Ubuntu 16.04\u4e0a\u642d\u5efa\u7684Eclim\u6765\u7f16\u5199\u4ee3\u7801\u3002<\/p>\n<h2>\u9879\u76ee<\/h2>\n<p>\u5728\u9879\u76ee\u76ee\u5f55\u4e2d\u521b\u5efa\u4ee5\u4e0b\u6587\u4ef6\u3002\u5b8c\u6574\u7684\u4ee3\u7801\u5728\u8fd9\u4e2a\u5b58\u50a8\u5e93\u4e2d\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"nv\">$ <\/span> tree\r\n<span class=\"nb\">.<\/span>\r\n\u251c\u2500\u2500 pom.xml\r\n\u2514\u2500\u2500 src\r\n    \u2514\u2500\u2500 main\r\n        \u251c\u2500\u2500 java\r\n        \u2502\u00a0\u00a0 \u2514\u2500\u2500 com\r\n        \u2502\u00a0\u00a0     \u2514\u2500\u2500 github\r\n        \u2502\u00a0\u00a0         \u2514\u2500\u2500 masato\r\n        \u2502\u00a0\u00a0             \u2514\u2500\u2500 streams\r\n        \u2502\u00a0\u00a0                 \u2514\u2500\u2500 kafka\r\n        \u2502\u00a0\u00a0                     \u251c\u2500\u2500 App.java\r\n        \u2502\u00a0\u00a0                     \u251c\u2500\u2500 SensorSumDeserializer.java\r\n        \u2502\u00a0\u00a0                     \u251c\u2500\u2500 SensorSum.java\r\n        \u2502\u00a0\u00a0                     \u2514\u2500\u2500 SensorSumSerializer.java\r\n        \u2514\u2500\u2500 resources\r\n            \u2514\u2500\u2500 logback.xml\r\n\r\n9 directories, 7 files\r\n<\/code><\/pre>\n<h2>App.java\u7684\u6c49\u8bed\u7ffb\u8bd1\u9009\u9879\uff1a\u4e3b\u7a0b\u5e8f.java<\/h2>\n<p>\u5c06\u4ee3\u7801\u5206\u6210\u51e0\u4e2a\u90e8\u5206\u8fdb\u884c\u8bf4\u660e\u3002<\/p>\n<h3>\u6570\u503c\u7684\u8bbe\u5b9a<\/h3>\n<p>\u4e3b\u9898\u540d\u79f0\u7b49\u5c06\u4ecepom.xml\u4e2d\u5b9a\u4e49\u7684\u73af\u5883\u53d8\u91cf\u4e2d\u83b7\u53d6\u3002WINDOWS_MINUTES\u662f\u7a97\u53e3\u805a\u5408\u7684\u65f6\u95f4\u95f4\u9694\u3002COMMIT_MINUTES\u662fKafka\u81ea\u52a8\u63d0\u4ea4\u7f13\u5b58\u7684\u65f6\u95f4\u95f4\u9694\uff0c\u540e\u9762\u4f1a\u8be6\u7ec6\u8bf4\u660e\u3002\u5728\u8fd9\u91cc\u4ee5\u5206\u949f\u4e3a\u5355\u4f4d\u6307\u5b9a\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kd\">public<\/span> <span class=\"kd\">class<\/span> <span class=\"nc\">App<\/span> <span class=\"o\">{<\/span>\r\n\r\n    <span class=\"kd\">private<\/span> <span class=\"kd\">static<\/span> <span class=\"kd\">final<\/span> <span class=\"nc\">Logger<\/span> <span class=\"no\">LOG<\/span> <span class=\"o\">=<\/span> <span class=\"nc\">LoggerFactory<\/span><span class=\"o\">.<\/span><span class=\"na\">getLogger<\/span><span class=\"o\">(<\/span><span class=\"nc\">App<\/span><span class=\"o\">.<\/span><span class=\"na\">class<\/span><span class=\"o\">);<\/span>\r\n\r\n    <span class=\"kd\">private<\/span> <span class=\"kd\">static<\/span> <span class=\"kd\">final<\/span> <span class=\"nc\">String<\/span> <span class=\"no\">SOURCE_TOPIC<\/span> <span class=\"o\">=<\/span> <span class=\"nc\">System<\/span><span class=\"o\">.<\/span><span class=\"na\">getenv<\/span><span class=\"o\">(<\/span><span class=\"s\">\"SOURCE_TOPIC\"<\/span><span class=\"o\">);<\/span>\r\n    <span class=\"kd\">private<\/span> <span class=\"kd\">static<\/span> <span class=\"kd\">final<\/span> <span class=\"nc\">String<\/span> <span class=\"no\">SINK_TOPIC<\/span> <span class=\"o\">=<\/span> <span class=\"nc\">System<\/span><span class=\"o\">.<\/span><span class=\"na\">getenv<\/span><span class=\"o\">(<\/span><span class=\"s\">\"SINK_TOPIC\"<\/span><span class=\"o\">);<\/span>\r\n    <span class=\"kd\">private<\/span> <span class=\"kd\">static<\/span> <span class=\"kd\">final<\/span> <span class=\"kt\">long<\/span> <span class=\"no\">WINDOWS_MINUTES<\/span> <span class=\"o\">=<\/span> <span class=\"mi\">2L<\/span><span class=\"o\">;<\/span>\r\n    <span class=\"kd\">private<\/span> <span class=\"kd\">static<\/span> <span class=\"kd\">final<\/span> <span class=\"kt\">long<\/span> <span class=\"no\">COMMIT_MINUTES<\/span> <span class=\"o\">=<\/span> <span class=\"mi\">3L<\/span><span class=\"o\">;<\/span>\r\n<\/code><\/pre>\n<h3>\u5e8f\u5217\u5316<\/h3>\n<p>\u521b\u5efa\u8bb0\u5f55\u7684\u5e8f\u5217\u5316\u5668\u548c\u53cd\u5e8f\u5217\u5316\u5668\u3002\u5728Kafka Streams\u5e94\u7528\u7a0b\u5e8f\u4e2d\uff0c\u5c06\u5904\u7406\u7684\u4e2d\u95f4\u7ed3\u679c\u4fdd\u5b58\u5230\u4e3b\u9898\u4e2d\uff0c\u5e76\u5b9e\u73b0\u6d41\u7a0b\u3002\u5b9a\u4e49\u4e00\u4e2aSerDe\u6765\u540c\u65f6\u5904\u7406\u4ece\u4e3b\u9898\u4e2d\u8bfb\u53d6\u8bb0\u5f55\u7684\u53cd\u5e8f\u5217\u5316\u5668\u548c\u5199\u5165\u8bb0\u5f55\u7684\u5e8f\u5217\u5316\u5668\u3002SerDe\u9700\u8981\u6839\u636e\u4e3b\u9898\u7684\u952e\u548c\u503c\u7684\u7c7b\u578b\u6765\u5b9a\u4e49\u3002<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">jsonSerde<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">SensorTag\u306e\u30ec\u30b3\u30fc\u30c9\u306f\u30ad\u30fc\u306f\u6587\u5b57\u5217\u3001\u5024\u306fJackson\u306eJsonNode\u30aa\u30d6\u30b8\u30a7\u30af\u30c8\u3067\u3059\u3002<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">sensorSumSerde<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">SenroSum\u306f\u30ab\u30b9\u30bf\u30e0\u3067\u4f5c\u6210\u3057\u305f\u5468\u56f2\u6e29\u5ea6 (ambient)\u3068\u30a6\u30a3\u30f3\u30c9\u30a6\u96c6\u8a08\u306e\u72b6\u614b\u3092\u4fdd\u6301\u3059\u308b\u30af\u30e9\u30b9\u3067\u3059\u3002<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">stringSerde<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">\u30c7\u30d5\u30a9\u30eb\u30c8\u306eString\u7528\u306eSerDe\u3067\u3059\u3002\u4eca\u56de\u30e1\u30c3\u30bb\u30fc\u30b8\u306e\u30ad\u30fc\u306f\u3059\u3079\u3066String\u3067\u3059\u3002<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">doubleSerde<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\u30c7\u30d5\u30a9\u30eb\u30c8\u306edouble\u7528\u306eSerDe\u3067\u3059\u3002SensorTag\u306e\u5468\u56f2\u6e29\u5ea6 (ambient)\u306fdouble\u3067\u30a6\u30a3\u30f3\u30c9\u30a6\u96c6\u8a08\u3057\u307e\u3059\u3002<\/ul>\n<pre class=\"post-pre\"><code>    <span class=\"kd\">public<\/span> <span class=\"kd\">static<\/span> <span class=\"kt\">void<\/span> <span class=\"nf\">main<\/span><span class=\"o\">(<\/span><span class=\"nc\">String<\/span><span class=\"o\">[]<\/span> <span class=\"n\">args<\/span><span class=\"o\">)<\/span> <span class=\"kd\">throws<\/span> <span class=\"nc\">Exception<\/span> <span class=\"o\">{<\/span>\r\n\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Serializer<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">JsonNode<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">jsonSerializer<\/span> <span class=\"o\">=<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">JsonSerializer<\/span><span class=\"o\">();<\/span>\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Deserializer<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">JsonNode<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">jsonDeserializer<\/span> <span class=\"o\">=<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">JsonDeserializer<\/span><span class=\"o\">();<\/span>\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Serde<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">JsonNode<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">jsonSerde<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"nc\">Serdes<\/span><span class=\"o\">.<\/span><span class=\"na\">serdeFrom<\/span><span class=\"o\">(<\/span><span class=\"n\">jsonSerializer<\/span><span class=\"o\">,<\/span> <span class=\"n\">jsonDeserializer<\/span><span class=\"o\">);<\/span>\r\n\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Serializer<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">SensorSum<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">sensorSumSerializer<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"k\">new<\/span> <span class=\"nf\">SensorSumSerializer<\/span><span class=\"o\">();<\/span>\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Deserializer<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">SensorSum<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">sensorSumDeserializer<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"k\">new<\/span> <span class=\"nf\">SensorSumDeserializer<\/span><span class=\"o\">();<\/span>\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Serde<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">SensorSum<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">sensorSumSerde<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"nc\">Serdes<\/span><span class=\"o\">.<\/span><span class=\"na\">serdeFrom<\/span><span class=\"o\">(<\/span><span class=\"n\">sensorSumSerializer<\/span><span class=\"o\">,<\/span>\r\n                             <span class=\"n\">sensorSumDeserializer<\/span><span class=\"o\">);<\/span>\r\n\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Serde<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">String<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">stringSerde<\/span> <span class=\"o\">=<\/span> <span class=\"nc\">Serdes<\/span><span class=\"o\">.<\/span><span class=\"na\">String<\/span><span class=\"o\">();<\/span>\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">Serde<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">Double<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">doubleSerde<\/span> <span class=\"o\">=<\/span> <span class=\"nc\">Serdes<\/span><span class=\"o\">.<\/span><span class=\"na\">Double<\/span><span class=\"o\">();<\/span>\r\n<\/code><\/pre>\n<h3>\u521b\u5efaKStream<\/h3>\n<p>\u9996\u5148\u8c03\u7528KStreamBuilder\u7684stream()\u51fd\u6570\u6765\u521b\u5efaKStream\u3002\u5176\u4e2d\uff0ctopic\u7684\u952e\u662f\u5b57\u7b26\u4e32\uff0c\u5e76\u6307\u5b9a\u4e86\u503c\u4e3aJsonNode\u7684SerDe\u3002<\/p>\n<pre class=\"post-pre\"><code>        <span class=\"kd\">final<\/span> <span class=\"nc\">KStreamBuilder<\/span> <span class=\"n\">builder<\/span> <span class=\"o\">=<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">KStreamBuilder<\/span><span class=\"o\">();<\/span>\r\n\r\n        <span class=\"no\">LOG<\/span><span class=\"o\">.<\/span><span class=\"na\">info<\/span><span class=\"o\">(<\/span><span class=\"s\">\"Starting Sorting Job\"<\/span><span class=\"o\">);<\/span>\r\n\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">KStream<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">String<\/span><span class=\"o\">,<\/span> <span class=\"nc\">JsonNode<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">source<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"n\">builder<\/span><span class=\"o\">.<\/span><span class=\"na\">stream<\/span><span class=\"o\">(<\/span><span class=\"n\">stringSerde<\/span><span class=\"o\">,<\/span> <span class=\"n\">jsonSerde<\/span><span class=\"o\">,<\/span> <span class=\"no\">SOURCE_TOPIC<\/span><span class=\"o\">);<\/span>\r\n<\/code><\/pre>\n<h3>\u521b\u5efaKGroupedStream<\/h3>\n<p>SensorTag\u7684\u6570\u636e\u4ee5JSON\u5b57\u7b26\u4e32\u7684\u5f62\u5f0f\u4ece\u6811\u8393\u6d3e3\u53d1\u9001\u5230Kafka\u4e3b\u9898\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"p\">{<\/span><span class=\"dl\">'<\/span><span class=\"s1\">bid<\/span><span class=\"dl\">'<\/span><span class=\"p\">:<\/span> <span class=\"dl\">'<\/span><span class=\"s1\">B0:B4:48:BE:5E:00<\/span><span class=\"dl\">'<\/span><span class=\"p\">,<\/span> <span class=\"dl\">'<\/span><span class=\"s1\">time<\/span><span class=\"dl\">'<\/span><span class=\"p\">:<\/span> <span class=\"mi\">1502152524<\/span><span class=\"p\">,<\/span> <span class=\"dl\">'<\/span><span class=\"s1\">humidity<\/span><span class=\"dl\">'<\/span><span class=\"p\">:<\/span> <span class=\"mf\">27.26287841796875<\/span><span class=\"p\">,<\/span> <span class=\"dl\">'<\/span><span class=\"s1\">objecttemp<\/span><span class=\"dl\">'<\/span><span class=\"p\">:<\/span> <span class=\"mf\">21.1875<\/span><span class=\"p\">,<\/span> <span class=\"dl\">'<\/span><span class=\"s1\">ambient<\/span><span class=\"dl\">'<\/span><span class=\"p\">:<\/span> <span class=\"mf\">27.03125<\/span><span class=\"p\">,<\/span> <span class=\"dl\">'<\/span><span class=\"s1\">rh<\/span><span class=\"dl\">'<\/span><span class=\"p\">:<\/span> <span class=\"mf\">75.311279296875<\/span><span class=\"p\">}<\/span>\r\n<\/code><\/pre>\n<p>KStream\u7684\u8bb0\u5f55\u662f\u5177\u6709\u952e\u548c\u503c\u7684KeyValue\u5bf9\u8c61\u3002\u5728\u793a\u4f8b\u4e2d\uff0c\u4e3a\u4e86\u4ec5\u5bf9\u73af\u5883\u6e29\u5ea6\u7684\u5e73\u5747\u503c\u8fdb\u884c\u7a97\u53e3\u805a\u5408\uff0c\u6211\u4eec\u8c03\u7528map()\u751f\u6210\u4e00\u4e2a\u65b0\u7684KStream\uff0c\u5b83\u53ea\u5305\u542b\u952e\u548c\u73af\u5883\u6e29\u5ea6\u7684\u5bf9\u3002<\/p>\n<p>\u7136\u540e\u8c03\u7528groupByKey()\u65b9\u6cd5\uff0c\u6839\u636e\u952e\u8fdb\u884c\u5206\u7ec4\u5e76\u521b\u5efa\u4e00\u4e2aKGroupedStream\u3002\u8bb0\u5f55\u7684\u952e\u662f\u5b57\u7b26\u4e32\uff0c\u503c\u662f\u5468\u56f4\u6e29\u5ea6\u7684double\uff0c\u56e0\u6b64\u9700\u8981\u6307\u5b9a\u5404\u81ea\u7684SerDe\u3002<\/p>\n<pre class=\"post-pre\"><code>        <span class=\"kd\">final<\/span> <span class=\"nc\">KGroupedStream<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">String<\/span><span class=\"o\">,<\/span> <span class=\"nc\">Double<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">sensors<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"n\">source<\/span>\r\n            <span class=\"o\">.<\/span><span class=\"na\">map<\/span><span class=\"o\">((<\/span><span class=\"n\">k<\/span><span class=\"o\">,<\/span> <span class=\"n\">v<\/span><span class=\"o\">)<\/span> <span class=\"o\">-&gt;<\/span> <span class=\"o\">{<\/span>\r\n                    <span class=\"kt\">double<\/span> <span class=\"n\">ambient<\/span> <span class=\"o\">=<\/span> <span class=\"n\">v<\/span><span class=\"o\">.<\/span><span class=\"na\">get<\/span><span class=\"o\">(<\/span><span class=\"s\">\"ambient\"<\/span><span class=\"o\">).<\/span><span class=\"na\">asDouble<\/span><span class=\"o\">();<\/span>\r\n                    <span class=\"k\">return<\/span> <span class=\"nc\">KeyValue<\/span><span class=\"o\">.<\/span><span class=\"na\">pair<\/span><span class=\"o\">(<\/span><span class=\"n\">k<\/span><span class=\"o\">,<\/span> <span class=\"n\">ambient<\/span><span class=\"o\">);<\/span>\r\n                <span class=\"o\">})<\/span>\r\n            <span class=\"o\">.<\/span><span class=\"na\">groupByKey<\/span><span class=\"o\">(<\/span><span class=\"n\">stringSerde<\/span><span class=\"o\">,<\/span> <span class=\"n\">doubleSerde<\/span><span class=\"o\">);<\/span>\r\n<\/code><\/pre>\n<h3>\u4f7f\u7528KStram\u521b\u5efaKTable<\/h3>\n<p>\u8c03\u7528KGroupedStream\u7684aggregate()\u65b9\u6cd5\u5c06\u521b\u5efa\u4e00\u4e2aKTable\u3002KTable\u5c06\u6309\u952e\u548c\u6307\u5b9a\u7684\u7a97\u53e3\u95f4\u9694\u4fdd\u6301\u8bb0\u5f55\u7684\u603b\u548c\u548c\u8bb0\u5f55\u6570\u7684\u72b6\u6001\u3002<\/p>\n<p>\u5728`aggregate()`\u4e2d\uff0c\u7b2c\u4e00\u4e2a\u53c2\u6570`Initializer`\u88ab\u7528\u4e8e\u521d\u59cb\u5316\u5728\u6d41\u805a\u5408\u4e2d\u4f7f\u7528\u7684\u805a\u5408\u5668\u3002\u5728\u8fd9\u91cc\uff0c\u6211\u4eec\u521d\u59cb\u5316\u4e86\u7528\u4e8e\u7a97\u53e3\u805a\u5408\u72b6\u6001\u7684`SensorSum`\u3002\u7b2c\u4e8c\u4e2a\u53c2\u6570\u5b9e\u73b0\u4e86\u805a\u5408\u5668\u3002\u5b83\u63a5\u6536\u5f53\u524d\u8bb0\u5f55\u7684\u952e\u503c\u3001\u4e0a\u4e00\u6b21\u8bb0\u5f55\u5904\u7406\u4e2d\u521b\u5efa\u7684`SensorSum`\u3002\u6bcf\u6b21\u6570\u636e\u5230\u8fbe\u65f6\uff0c\u5b83\u4f1a\u5c06\u603b\u548c\u548c\u8bb0\u5f55\u6570\u76f8\u52a0\uff0c\u5e76\u8fd4\u56de\u4e00\u4e2a\u65b0\u7684`SensorSum`\u3002\u7b2c\u4e09\u4e2a\u53c2\u6570\u5b9a\u4e49\u4e86\u4e00\u4e2a2\u5206\u949f\u7684`TimeWindows`\u3002\u7b2c\u56db\u4e2a\u53c2\u6570\u662f`SensorSum`\u7684\u5e8f\u5217\u5316\u548c\u53cd\u5e8f\u5217\u5316\u5668\uff0c\u7b2c\u4e94\u4e2a\u53c2\u6570\u662f\u7528\u4e8e\u4fdd\u6301\u72b6\u6001\u7684\u4e3b\u9898\u540d\u79f0\u3002<\/p>\n<pre class=\"post-pre\"><code>        <span class=\"kd\">final<\/span> <span class=\"nc\">KTable<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">Windowed<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">String<\/span><span class=\"o\">&gt;,<\/span> <span class=\"nc\">SensorSum<\/span><span class=\"o\">&gt;<\/span> <span class=\"n\">sensorAgg<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"n\">sensors<\/span>\r\n            <span class=\"o\">.<\/span><span class=\"na\">aggregate<\/span><span class=\"o\">(()<\/span> <span class=\"o\">-&gt;<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">SensorSum<\/span><span class=\"o\">(<\/span><span class=\"mi\">0<\/span><span class=\"no\">D<\/span><span class=\"o\">,<\/span> <span class=\"mi\">0<\/span><span class=\"o\">)<\/span>\r\n                       <span class=\"o\">,<\/span> <span class=\"o\">(<\/span><span class=\"n\">aggKey<\/span><span class=\"o\">,<\/span> <span class=\"n\">value<\/span><span class=\"o\">,<\/span> <span class=\"n\">agg<\/span><span class=\"o\">)<\/span> <span class=\"o\">-&gt;<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">SensorSum<\/span><span class=\"o\">(<\/span><span class=\"n\">agg<\/span><span class=\"o\">.<\/span><span class=\"na\">sum<\/span> <span class=\"o\">+<\/span> <span class=\"n\">value<\/span><span class=\"o\">,<\/span> <span class=\"n\">agg<\/span><span class=\"o\">.<\/span><span class=\"na\">count<\/span> <span class=\"o\">+<\/span> <span class=\"mi\">1<\/span><span class=\"o\">)<\/span>\r\n                       <span class=\"o\">,<\/span> <span class=\"nc\">TimeWindows<\/span><span class=\"o\">.<\/span><span class=\"na\">of<\/span><span class=\"o\">(<\/span><span class=\"nc\">TimeUnit<\/span><span class=\"o\">.<\/span><span class=\"na\">MINUTES<\/span><span class=\"o\">.<\/span><span class=\"na\">toMillis<\/span><span class=\"o\">(<\/span><span class=\"no\">WINDOWS_MINUTES<\/span><span class=\"o\">))<\/span>\r\n                       <span class=\"o\">,<\/span> <span class=\"n\">sensorSumSerde<\/span><span class=\"o\">,<\/span>\r\n                       <span class=\"s\">\"sensorSum\"<\/span><span class=\"o\">);<\/span>\r\n<\/code><\/pre>\n<h3>\u4eceKTable\u521b\u5efaKStream\u3002<\/h3>\n<p>\u4f7f\u7528KTable\u7684mapValues()\u51fd\u6570\u8ba1\u7b97\u5e73\u5747\u503c\u3002\u5c06\u603b\u548c\u9664\u4ee5\u8bb0\u5f55\u6570\u5f97\u5230\u7684\u5e73\u5747\u503c\u662f\u4e00\u4e2a\u65b0\u7684Double\u7c7b\u578b\u8bb0\u5f55\u7684KTable\u3002\u7136\u540e\u8c03\u7528toStream()\u51fd\u6570\u521b\u5efaKStream\u3002\u5c06\u8bb0\u5f55\u683c\u5f0f\u5316\u4e3a\u65f6\u95f4\u6233\u3001\u952e\u3001\u5e73\u5747\u503c\u7684JSON\u5b57\u7b26\u4e32\u5e76\u8f93\u51fa\u5230\u6d41\u4e2d\u3002\u65f6\u95f4\u6233\u91c7\u7528ISO 8601\u683c\u5f0f\uff0c\u4ee5\u4fbf\u5728\u4e0d\u540c\u7cfb\u7edf\u4e4b\u95f4\u65b9\u4fbf\u8fdb\u884c\u6570\u636e\u4ea4\u6362\u3002\u6700\u540e\u5c06\u8bb0\u5f55\u4fdd\u5b58\u5230\u6307\u5b9a\u7684\u4e3b\u9898\uff0c\u5e76\u7ed3\u675f\u3002<\/p>\n<pre class=\"post-pre\"><code>        <span class=\"kd\">final<\/span> <span class=\"nc\">DateTimeFormatter<\/span> <span class=\"n\">fmt<\/span> <span class=\"o\">=<\/span>\r\n            <span class=\"nc\">DateTimeFormatter<\/span><span class=\"o\">.<\/span><span class=\"na\">ISO_OFFSET_DATE_TIME<\/span><span class=\"o\">;<\/span>\r\n\r\n        <span class=\"n\">sensorAgg<\/span>\r\n            <span class=\"o\">.&lt;<\/span><span class=\"nc\">Double<\/span><span class=\"o\">&gt;<\/span><span class=\"n\">mapValues<\/span><span class=\"o\">((<\/span><span class=\"n\">v<\/span><span class=\"o\">)<\/span> <span class=\"o\">-&gt;<\/span> <span class=\"o\">((<\/span><span class=\"kt\">double<\/span><span class=\"o\">)<\/span> <span class=\"n\">v<\/span><span class=\"o\">.<\/span><span class=\"na\">sum<\/span> <span class=\"o\">\/<\/span> <span class=\"n\">v<\/span><span class=\"o\">.<\/span><span class=\"na\">count<\/span><span class=\"o\">))<\/span>\r\n            <span class=\"o\">.<\/span><span class=\"na\">toStream<\/span><span class=\"o\">()<\/span>\r\n            <span class=\"o\">.<\/span><span class=\"na\">map<\/span><span class=\"o\">((<\/span><span class=\"n\">key<\/span><span class=\"o\">,<\/span> <span class=\"n\">avg<\/span><span class=\"o\">)<\/span> <span class=\"o\">-&gt;<\/span> <span class=\"o\">{<\/span>\r\n                    <span class=\"kt\">long<\/span> <span class=\"n\">end<\/span> <span class=\"o\">=<\/span> <span class=\"n\">key<\/span><span class=\"o\">.<\/span><span class=\"na\">window<\/span><span class=\"o\">().<\/span><span class=\"na\">end<\/span><span class=\"o\">();<\/span>\r\n                    <span class=\"nc\">ZonedDateTime<\/span> <span class=\"n\">zdt<\/span> <span class=\"o\">=<\/span>\r\n                        <span class=\"k\">new<\/span> <span class=\"nf\">Date<\/span><span class=\"o\">(<\/span><span class=\"n\">end<\/span><span class=\"o\">).<\/span><span class=\"na\">toInstant<\/span><span class=\"o\">()<\/span>\r\n                        <span class=\"o\">.<\/span><span class=\"na\">atZone<\/span><span class=\"o\">(<\/span><span class=\"nc\">ZoneId<\/span><span class=\"o\">.<\/span><span class=\"na\">systemDefault<\/span><span class=\"o\">());<\/span>\r\n                    <span class=\"nc\">String<\/span> <span class=\"n\">time<\/span> <span class=\"o\">=<\/span> <span class=\"n\">fmt<\/span><span class=\"o\">.<\/span><span class=\"na\">format<\/span><span class=\"o\">(<\/span><span class=\"n\">zdt<\/span><span class=\"o\">);<\/span>\r\n                    <span class=\"nc\">String<\/span> <span class=\"n\">bid<\/span> <span class=\"o\">=<\/span> <span class=\"n\">key<\/span><span class=\"o\">.<\/span><span class=\"na\">key<\/span><span class=\"o\">();<\/span>\r\n                    <span class=\"nc\">String<\/span> <span class=\"n\">retval<\/span> <span class=\"o\">=<\/span>\r\n                        <span class=\"nc\">String<\/span><span class=\"o\">.<\/span><span class=\"na\">format<\/span><span class=\"o\">(<\/span><span class=\"s\">\"{\\\"time\\\": \\\"%s\\\", \\\"bid\\\": \\\"%s\\\", \\\"ambient\\\": %f}\"<\/span><span class=\"o\">,<\/span>\r\n                                      <span class=\"n\">time<\/span><span class=\"o\">,<\/span> <span class=\"n\">bid<\/span><span class=\"o\">,<\/span> <span class=\"n\">avg<\/span><span class=\"o\">);<\/span>\r\n                    <span class=\"no\">LOG<\/span><span class=\"o\">.<\/span><span class=\"na\">info<\/span><span class=\"o\">(<\/span><span class=\"n\">retval<\/span><span class=\"o\">);<\/span>\r\n                    <span class=\"k\">return<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">KeyValue<\/span><span class=\"o\">&lt;<\/span><span class=\"nc\">String<\/span><span class=\"o\">,<\/span><span class=\"nc\">String<\/span><span class=\"o\">&gt;(<\/span><span class=\"n\">bid<\/span><span class=\"o\">,<\/span> <span class=\"n\">retval<\/span><span class=\"o\">);<\/span>\r\n             <span class=\"o\">})<\/span>\r\n            <span class=\"o\">.<\/span><span class=\"na\">to<\/span><span class=\"o\">(<\/span><span class=\"no\">SINK_TOPIC<\/span><span class=\"o\">);<\/span>\r\n<\/code><\/pre>\n<h3>\u5f00\u59cbKafka Streams<\/h3>\n<p>\u4f7f\u7528\u8bbe\u7f6e\u5bf9\u8c61\u548c\u6784\u5efa\u5668\u521b\u5efaKafkaStreams\uff0c\u5e76\u542f\u52a8Kafka Streams\u5e94\u7528\u7a0b\u5e8f\u3002\u8fd8\u9700\u8981\u5728SIGTERM\u65f6\u5c06Kafka Stream\u6ce8\u518c\u5230\u5173\u95ed\u6302\u94a9\u4e2d\u4ee5\u505c\u6b62\u5b83\u3002<\/p>\n<pre class=\"post-pre\"><code>        <span class=\"kd\">final<\/span> <span class=\"nc\">StreamsConfig<\/span> <span class=\"n\">config<\/span> <span class=\"o\">=<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">StreamsConfig<\/span><span class=\"o\">(<\/span><span class=\"n\">getProperties<\/span><span class=\"o\">());<\/span>\r\n        <span class=\"kd\">final<\/span> <span class=\"nc\">KafkaStreams<\/span> <span class=\"n\">streams<\/span> <span class=\"o\">=<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">KafkaStreams<\/span><span class=\"o\">(<\/span><span class=\"n\">builder<\/span><span class=\"o\">,<\/span> <span class=\"n\">config<\/span><span class=\"o\">);<\/span>\r\n        <span class=\"n\">streams<\/span><span class=\"o\">.<\/span><span class=\"na\">start<\/span><span class=\"o\">();<\/span>\r\n\r\n        <span class=\"nc\">Runtime<\/span><span class=\"o\">.<\/span><span class=\"na\">getRuntime<\/span><span class=\"o\">().<\/span><span class=\"na\">addShutdownHook<\/span><span class=\"o\">(<\/span><span class=\"k\">new<\/span> <span class=\"nc\">Thread<\/span><span class=\"o\">(<\/span><span class=\"nl\">streams:<\/span><span class=\"o\">:<\/span><span class=\"n\">close<\/span><span class=\"o\">));<\/span>\r\n<\/code><\/pre>\n<h3>\u5173\u4e8eKafka Streams\u7684\u8bbe\u7f6e\u548c\u8d85\u65f6\u95ee\u9898<\/h3>\n<p>\u4ece\u73af\u5883\u53d8\u91cf\u7b49\u521b\u5efa\u7528\u4e8eKafka Streams\u8bbe\u7f6e\u7684\u5c5e\u6027\u3002<\/p>\n<pre class=\"post-pre\"><code>    <span class=\"kd\">private<\/span> <span class=\"kd\">static<\/span> <span class=\"nc\">Properties<\/span> <span class=\"nf\">getProperties<\/span><span class=\"o\">()<\/span> <span class=\"o\">{<\/span>\r\n        <span class=\"nc\">Properties<\/span> <span class=\"n\">props<\/span> <span class=\"o\">=<\/span> <span class=\"k\">new<\/span> <span class=\"nc\">Properties<\/span><span class=\"o\">();<\/span>\r\n        <span class=\"n\">props<\/span><span class=\"o\">.<\/span><span class=\"na\">put<\/span><span class=\"o\">(<\/span><span class=\"nc\">StreamsConfig<\/span><span class=\"o\">.<\/span><span class=\"na\">APPLICATION_ID_CONFIG<\/span><span class=\"o\">,<\/span>\r\n                  <span class=\"nc\">System<\/span><span class=\"o\">.<\/span><span class=\"na\">getenv<\/span><span class=\"o\">(<\/span><span class=\"s\">\"APPLICATION_ID_CONFIG\"<\/span><span class=\"o\">));<\/span>\r\n        <span class=\"n\">props<\/span><span class=\"o\">.<\/span><span class=\"na\">put<\/span><span class=\"o\">(<\/span><span class=\"nc\">StreamsConfig<\/span><span class=\"o\">.<\/span><span class=\"na\">BOOTSTRAP_SERVERS_CONFIG<\/span><span class=\"o\">,<\/span>\r\n                  <span class=\"nc\">System<\/span><span class=\"o\">.<\/span><span class=\"na\">getenv<\/span><span class=\"o\">(<\/span><span class=\"s\">\"BOOTSTRAP_SERVERS_CONFIG\"<\/span><span class=\"o\">));<\/span>\r\n        <span class=\"n\">props<\/span><span class=\"o\">.<\/span><span class=\"na\">put<\/span><span class=\"o\">(<\/span><span class=\"nc\">StreamsConfig<\/span><span class=\"o\">.<\/span><span class=\"na\">KEY_SERDE_CLASS_CONFIG<\/span><span class=\"o\">,<\/span>\r\n                  <span class=\"nc\">Serdes<\/span><span class=\"o\">.<\/span><span class=\"na\">String<\/span><span class=\"o\">().<\/span><span class=\"na\">getClass<\/span><span class=\"o\">().<\/span><span class=\"na\">getName<\/span><span class=\"o\">());<\/span>\r\n        <span class=\"n\">props<\/span><span class=\"o\">.<\/span><span class=\"na\">put<\/span><span class=\"o\">(<\/span><span class=\"nc\">StreamsConfig<\/span><span class=\"o\">.<\/span><span class=\"na\">VALUE_SERDE_CLASS_CONFIG<\/span><span class=\"o\">,<\/span>\r\n                  <span class=\"nc\">Serdes<\/span><span class=\"o\">.<\/span><span class=\"na\">String<\/span><span class=\"o\">().<\/span><span class=\"na\">getClass<\/span><span class=\"o\">().<\/span><span class=\"na\">getName<\/span><span class=\"o\">());<\/span>\r\n        <span class=\"n\">props<\/span><span class=\"o\">.<\/span><span class=\"na\">put<\/span><span class=\"o\">(<\/span><span class=\"nc\">StreamsConfig<\/span><span class=\"o\">.<\/span><span class=\"na\">TIMESTAMP_EXTRACTOR_CLASS_CONFIG<\/span><span class=\"o\">,<\/span>\r\n                  <span class=\"nc\">WallclockTimestampExtractor<\/span><span class=\"o\">.<\/span><span class=\"na\">class<\/span><span class=\"o\">.<\/span><span class=\"na\">getName<\/span><span class=\"o\">());<\/span>\r\n        <span class=\"n\">props<\/span><span class=\"o\">.<\/span><span class=\"na\">put<\/span><span class=\"o\">(<\/span><span class=\"nc\">StreamsConfig<\/span><span class=\"o\">.<\/span><span class=\"na\">COMMIT_INTERVAL_MS_CONFIG<\/span><span class=\"o\">,<\/span>\r\n                  <span class=\"nc\">TimeUnit<\/span><span class=\"o\">.<\/span><span class=\"na\">MINUTES<\/span><span class=\"o\">.<\/span><span class=\"na\">toMillis<\/span><span class=\"o\">(<\/span><span class=\"no\">COMMIT_MINUTES<\/span><span class=\"o\">));<\/span>\r\n\r\n        <span class=\"k\">return<\/span> <span class=\"n\">props<\/span><span class=\"o\">;<\/span>\r\n    <span class=\"o\">}<\/span>\r\n<\/code><\/pre>\n<h3>COMMIT_INTERVAL_MS_CONFIG\u7684\u4e2d\u6587\u91ca\u4e49\u4e3a\u201c\u63d0\u4ea4\u95f4\u9694\u6beb\u79d2\u914d\u7f6e\u201d\u3002<\/h3>\n<p>\u5728\u6700\u521d\u7684\u65f6\u5019\uff0cStreamsConfig.COMMIT_INTERVAL_MS_CONFIG\u6ca1\u6709\u8fdb\u884c\u66f4\u6539\u3002\u5728\u5c06\u8bb0\u5f55\u4fdd\u5b58\u5230\u4e3b\u9898\u4e4b\u524d\uff0c\u6211\u4eec\u5728KStream\u7684map()\u4e2d\u8f93\u51fa\u4e86\u65e5\u5fd7\u3002\u6211\u60f3\u8981\u57282\u5206\u949f\u7684\u7a97\u53e3\u95f4\u9694\u5185\u53ea\u8f93\u51fa\u4e00\u6b21\u6700\u540e\u7684\u805a\u5408\u7ed3\u679c\uff0c\u4f46\u7ed3\u679c\u5374\u91cd\u590d\u51fa\u73b0\u4e864-5\u6b21\uff0c\u4e14\u6b21\u6570\u4e0d\u786e\u5b9a\u3002<\/p>\n<pre class=\"post-pre\"><code>{\"time\": \"2017-08-08T10:34:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.343750}\r\n{\"time\": \"2017-08-08T10:34:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.385417}\r\n{\"time\": \"2017-08-08T10:34:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.410156}\r\n{\"time\": \"2017-08-08T10:34:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.440341}\r\n{\"time\": \"2017-08-08T10:34:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.450521}\r\n{\"time\": \"2017-08-08T10:36:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.562500}\r\n{\"time\": \"2017-08-08T10:36:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.562500}\r\n<\/code><\/pre>\n<p>\u6839\u636e\u4ee5\u4e0b\u53c2\u8003\u6587\u7ae0\uff0c\u8fd9\u4f3c\u4e4e\u662f\u671f\u671b\u4eceKTable\u7684\u53d8\u66f4\u5c65\u5386\uff08changelog stream\uff09\u8fd9\u4e2a\u7279\u6027\u4e2d\u83b7\u5f97\u7684\u884c\u4e3a\u3002KTable\u6ca1\u6709\u7a97\u53e3\u805a\u5408\u7684\u6700\u7ec8\u7ed3\u679c\u72b6\u6001\uff0c\u66f4\u65b0\u540e\u7684\u503c\u4f1a\u6309\u4e00\u5b9a\u65f6\u95f4\u95f4\u9694\u8fdb\u884c\u63d0\u4ea4\u66f4\u65b0\u5230\u7f13\u5b58\u4e2d\u3002\u5728\u5c06KTable\u8f6c\u6362\u4e3aKStream\u540e\uff0c\u9700\u8981\u81ea\u5df1\u5b9e\u73b0\u4f7f\u7528transform()\u6216process()\u6765\u53bb\u9664\u8bb0\u5f55\u91cd\u590d\u7684\u4ee3\u7801\u3002<\/p>\n<p>\u867d\u7136\u65e0\u6cd5\u5b8c\u5168\u6d88\u9664\u91cd\u590d\u7684\u8bb0\u5f55\uff0c\u4f46\u53ef\u4ee5\u901a\u8fc7\u589e\u52a0StreamsConfig.COMMIT_INTERVAL_MS_CONFIG\u7684\u503c\u6765\u51cf\u5c11\u7f13\u5b58\u63d0\u4ea4\u7684\u6b21\u6570\u3002\u9ed8\u8ba4\u503c\u4e3a30\u79d2\u3002<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">How to send final kafka-streams aggregation result of a time windowed KTable?<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">Immutable Record with Kafka Stream<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">Kafka KStreams &#8211; processing timeouts<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">\n<li style=\"list-style-type: none;\">\n<ul class=\"post-ul\">Kafka Streams for Stream processing A few words about how Kafka works.<\/ul>\n<\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n<ul class=\"post-ul\">Memory management<\/ul>\n<h2>\u5176\u4ed6\u7684\u73ed\u7ea7<\/h2>\n<p>\u51c6\u5907\u6a21\u578b\uff08SensorSum.java\uff09\u3001\u5e8f\u5217\u5316\u5668\uff08SensorSumSerializer.java\uff09\u548c\u53cd\u5e8f\u5217\u5316\u5668\uff08SensorSumDeserializer.java\uff09\u7684\u7c7b\u3002\u5e8f\u5217\u5316\u5668\u5b9e\u73b0serialize()\u65b9\u6cd5\uff0c\u5c06SensorSum\u7684\u5c5e\u6027\u8f6c\u6362\u4e3a\u5b57\u8282\u6570\u7ec4\u3002\u5b83\u5c06\u5206\u914d\u4e00\u4e2a8\u5b57\u8282\u7684Double\u7c7b\u578b\u7684\u5b57\u8282\u7f13\u51b2\u533a\u6765\u5b58\u50a8\u5468\u56f4\u6e29\u5ea6\u603b\u548c\uff0c\u5e76\u4f7f\u75284\u5b57\u8282\u7684Integer\u7c7b\u578b\u6765\u5b58\u50a8\u8bb0\u5f55\u6570\u3002<\/p>\n<pre class=\"post-pre\"><code>    <span class=\"kd\">public<\/span> <span class=\"kt\">byte<\/span><span class=\"o\">[]<\/span> <span class=\"nf\">serialize<\/span><span class=\"o\">(<\/span><span class=\"nc\">String<\/span> <span class=\"n\">topic<\/span><span class=\"o\">,<\/span> <span class=\"nc\">SensorSum<\/span> <span class=\"n\">data<\/span><span class=\"o\">)<\/span> <span class=\"o\">{<\/span>\r\n        <span class=\"nc\">ByteBuffer<\/span> <span class=\"n\">buffer<\/span> <span class=\"o\">=<\/span> <span class=\"nc\">ByteBuffer<\/span><span class=\"o\">.<\/span><span class=\"na\">allocate<\/span><span class=\"o\">(<\/span><span class=\"mi\">8<\/span> <span class=\"o\">+<\/span> <span class=\"mi\">4<\/span><span class=\"o\">);<\/span>\r\n        <span class=\"n\">buffer<\/span><span class=\"o\">.<\/span><span class=\"na\">putDouble<\/span><span class=\"o\">(<\/span><span class=\"n\">data<\/span><span class=\"o\">.<\/span><span class=\"na\">sum<\/span><span class=\"o\">);<\/span>\r\n        <span class=\"n\">buffer<\/span><span class=\"o\">.<\/span><span class=\"na\">putInt<\/span><span class=\"o\">(<\/span><span class=\"n\">data<\/span><span class=\"o\">.<\/span><span class=\"na\">count<\/span><span class=\"o\">);<\/span>\r\n\r\n        <span class=\"k\">return<\/span> <span class=\"n\">buffer<\/span><span class=\"o\">.<\/span><span class=\"na\">array<\/span><span class=\"o\">();<\/span>\r\n    <span class=\"o\">}<\/span>\r\n<\/code><\/pre>\n<h2>\u6267\u884c<\/h2>\n<p>\u7528Exec Maven Plugin\u6765\u6267\u884cKafka Streams\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"nv\">$ <\/span>mvn clean <span class=\"nb\">install exec<\/span>:exec@json\r\n<\/code><\/pre>\n<p>\u6211\u5c1d\u8bd5\u5c06\u7a97\u53e3\u95f4\u9694\u8bbe\u7f6e\u4e3a2\u5206\u949f\uff0c\u5e76\u5c06\u7f13\u5b58\u63d0\u4ea4\u95f4\u9694\u6307\u5b9a\u4e3a3\u5206\u949f\u3002\u867d\u7136\u4f9d\u7136\u51fa\u73b0\u4e86\u51e0\u6b21\u91cd\u590d\u7684\u8f93\u51fa\uff0c\u4f46\u6210\u529f\u51cf\u5c11\u4e86\u91cd\u590d\u8f93\u51fa\u7684\u6b21\u6570\u3002<\/p>\n<pre class=\"post-pre\"><code>{\"time\": \"2017-08-08T11:32:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.414773}\r\n{\"time\": \"2017-08-08T11:34:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.414063}\r\n{\"time\": \"2017-08-08T11:36:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.453125}\r\n{\"time\": \"2017-08-08T11:36:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.476563}\r\n{\"time\": \"2017-08-08T11:38:00+09:00\", \"bid\": \"B0:B4:48:BE:5E:00\", \"ambient\": 27.546875}\r\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u6211\u5011\u5728Jupyter\u7684\u904b\u884c\u74b0\u5883\u4e2d\u4f7f\u7528PySpark Streaming\u4f86\u8a66\u9a57SensorTag\u7684\u6578\u64da\u9032\u884c\u7a97\u53e3\u805a [&hellip;]<\/p>\n","protected":false},"author":7,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-46945","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"yoast_head":"<!-- This site is optimized with the Yoast SEO Premium plugin v21.5 (Yoast SEO v21.5) - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408 - Blog - Silicon Cloud<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.silicloud.com\/zh\/blog\/\u4f7f\u7528kafka-streams\u5bf9sensortag\u8fdb\u884c\u7a97\u53e3\u805a\u5408\u3002\/\" \/>\n<meta property=\"og:locale\" content=\"zh_CN\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408\" \/>\n<meta property=\"og:description\" content=\"\u6211\u5011\u5728Jupyter\u7684\u904b\u884c\u74b0\u5883\u4e2d\u4f7f\u7528PySpark Streaming\u4f86\u8a66\u9a57SensorTag\u7684\u6578\u64da\u9032\u884c\u7a97\u53e3\u805a [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.silicloud.com\/zh\/blog\/\u4f7f\u7528kafka-streams\u5bf9sensortag\u8fdb\u884c\u7a97\u53e3\u805a\u5408\u3002\/\" \/>\n<meta property=\"og:site_name\" content=\"Blog - Silicon Cloud\" \/>\n<meta property=\"article:published_time\" content=\"2023-02-02T04:04:31+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2024-05-03T18:03:36+00:00\" \/>\n<meta name=\"author\" content=\"\u79d1, \u9896\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:label1\" content=\"\u4f5c\u8005\" \/>\n\t<meta name=\"twitter:data1\" content=\"\u79d1, \u9896\" \/>\n\t<meta name=\"twitter:label2\" content=\"\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4\" \/>\n\t<meta name=\"twitter:data2\" content=\"4 \u5206\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"WebPage\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/\",\"url\":\"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/\",\"name\":\"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408 - Blog - Silicon Cloud\",\"isPartOf\":{\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#website\"},\"datePublished\":\"2023-02-02T04:04:31+00:00\",\"dateModified\":\"2024-05-03T18:03:36+00:00\",\"author\":{\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/8ca01ba7f7362ad4edb7da206a12f29e\"},\"breadcrumb\":{\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/#breadcrumb\"},\"inLanguage\":\"zh-Hans\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/\"]}]},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"\u9996\u9875\",\"item\":\"https:\/\/www.silicloud.com\/zh\/blog\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#website\",\"url\":\"https:\/\/www.silicloud.com\/zh\/blog\/\",\"name\":\"Blog - Silicon Cloud\",\"description\":\"\",\"inLanguage\":\"zh-Hans\"},{\"@type\":\"Person\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/8ca01ba7f7362ad4edb7da206a12f29e\",\"name\":\"\u79d1, \u9896\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"zh-Hans\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/8a6fb3cc7ba2f69d2189ba532aec4633ea7ed75ac0af162ec367cb3abc0fb2af?s=96&d=mm&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/8a6fb3cc7ba2f69d2189ba532aec4633ea7ed75ac0af162ec367cb3abc0fb2af?s=96&d=mm&r=g\",\"caption\":\"\u79d1, \u9896\"},\"url\":\"https:\/\/www.silicloud.com\/zh\/blog\/author\/keying\/\"},{\"@type\":\"ImageObject\",\"inLanguage\":\"zh-Hans\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/#local-main-organization-logo\",\"url\":\"\",\"contentUrl\":\"\",\"caption\":\"Blog - Silicon Cloud\"}]}<\/script>\n<!-- \/ Yoast SEO Premium plugin. -->","yoast_head_json":{"title":"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408 - Blog - Silicon Cloud","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.silicloud.com\/zh\/blog\/\u4f7f\u7528kafka-streams\u5bf9sensortag\u8fdb\u884c\u7a97\u53e3\u805a\u5408\u3002\/","og_locale":"zh_CN","og_type":"article","og_title":"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408","og_description":"\u6211\u5011\u5728Jupyter\u7684\u904b\u884c\u74b0\u5883\u4e2d\u4f7f\u7528PySpark Streaming\u4f86\u8a66\u9a57SensorTag\u7684\u6578\u64da\u9032\u884c\u7a97\u53e3\u805a [&hellip;]","og_url":"https:\/\/www.silicloud.com\/zh\/blog\/\u4f7f\u7528kafka-streams\u5bf9sensortag\u8fdb\u884c\u7a97\u53e3\u805a\u5408\u3002\/","og_site_name":"Blog - Silicon Cloud","article_published_time":"2023-02-02T04:04:31+00:00","article_modified_time":"2024-05-03T18:03:36+00:00","author":"\u79d1, \u9896","twitter_card":"summary_large_image","twitter_misc":{"\u4f5c\u8005":"\u79d1, \u9896","\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4":"4 \u5206"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"WebPage","@id":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/","url":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/","name":"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408 - Blog - Silicon Cloud","isPartOf":{"@id":"https:\/\/www.silicloud.com\/zh\/blog\/#website"},"datePublished":"2023-02-02T04:04:31+00:00","dateModified":"2024-05-03T18:03:36+00:00","author":{"@id":"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/8ca01ba7f7362ad4edb7da206a12f29e"},"breadcrumb":{"@id":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/#breadcrumb"},"inLanguage":"zh-Hans","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/"]}]},{"@type":"BreadcrumbList","@id":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"\u9996\u9875","item":"https:\/\/www.silicloud.com\/zh\/blog\/"},{"@type":"ListItem","position":2,"name":"\u4f7f\u7528Kafka Streams\u5bf9SensorTag\u8fdb\u884c\u7a97\u53e3\u805a\u5408"}]},{"@type":"WebSite","@id":"https:\/\/www.silicloud.com\/zh\/blog\/#website","url":"https:\/\/www.silicloud.com\/zh\/blog\/","name":"Blog - Silicon Cloud","description":"","inLanguage":"zh-Hans"},{"@type":"Person","@id":"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/8ca01ba7f7362ad4edb7da206a12f29e","name":"\u79d1, \u9896","image":{"@type":"ImageObject","inLanguage":"zh-Hans","@id":"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/image\/","url":"https:\/\/secure.gravatar.com\/avatar\/8a6fb3cc7ba2f69d2189ba532aec4633ea7ed75ac0af162ec367cb3abc0fb2af?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/8a6fb3cc7ba2f69d2189ba532aec4633ea7ed75ac0af162ec367cb3abc0fb2af?s=96&d=mm&r=g","caption":"\u79d1, \u9896"},"url":"https:\/\/www.silicloud.com\/zh\/blog\/author\/keying\/"},{"@type":"ImageObject","inLanguage":"zh-Hans","@id":"https:\/\/www.silicloud.com\/zh\/blog\/%e4%bd%bf%e7%94%a8kafka-streams%e5%af%b9sensortag%e8%bf%9b%e8%a1%8c%e7%aa%97%e5%8f%a3%e8%81%9a%e5%90%88%e3%80%82\/#local-main-organization-logo","url":"","contentUrl":"","caption":"Blog - Silicon Cloud"}]}},"_links":{"self":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts\/46945","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/users\/7"}],"replies":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/comments?post=46945"}],"version-history":[{"count":2,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts\/46945\/revisions"}],"predecessor-version":[{"id":95448,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts\/46945\/revisions\/95448"}],"wp:attachment":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/media?parent=46945"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/categories?post=46945"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/tags?post=46945"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}