{"id":36493,"date":"2023-03-22T19:26:19","date_gmt":"2023-02-11T05:19:30","guid":{"rendered":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/"},"modified":"2024-04-29T20:05:52","modified_gmt":"2024-04-29T12:05:52","slug":"apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91","status":"publish","type":"post","link":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/","title":{"rendered":"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011"},"content":{"rendered":"<h1>\u9996\u5148<\/h1>\n<p>\u5728\u8fd9\u7bc7\u6587\u7ae0\u4e2d\uff0c\u603b\u7ed3\u4e86Apache Beam Python SDK\u63d0\u4f9b\u7684Transform\u3002\u901a\u8fc7\u4e86\u89e3\u53ef\u4ee5\u65b9\u4fbf\u8c03\u7528\u7684\u5404\u79cdTransform\uff0c\u53ef\u4ee5\u66f4\u5feb\u5730\u5236\u5b9a\u5b9e\u65bd\u7b56\u7565\u3002<\/p>\n<div><img decoding=\"async\" class=\"post-images\" title=\"\" src=\"https:\/\/cdn.silicloud.com\/blog-img\/blog\/img\/657d295737434c4406c4373c\/2-0.png\" alt=\"undefined\" \/><\/div>\n<h1>\u9010\u4e2a\u5143\u7d20\u7684\u5904\u7406<\/h1>\n<h2>\u6267\u884cParDo &#8211; DoFn<\/h2>\n<p>\u5bf9\u4e8ePCollection\u4e2d\u7684\u6bcf\u4e2a\u5143\u7d20\uff0c\u8bf7\u6267\u884c\u67d0\u4e9b\u5904\u7406\uff08DoFn\uff09\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">ComputeWordLength<\/span><span class=\"p\">(<\/span><span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">DoFn<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">__init__<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"nb\">super<\/span><span class=\"p\">(<\/span><span class=\"n\">ComputeWordLength<\/span><span class=\"p\">,<\/span> <span class=\"bp\">self<\/span><span class=\"p\">).<\/span><span class=\"n\">__init__<\/span><span class=\"p\">()<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">process<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">,<\/span> <span class=\"n\">element<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"k\">yield<\/span> <span class=\"nb\">len<\/span><span class=\"p\">(<\/span><span class=\"n\">element<\/span><span class=\"p\">)<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestParDo<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_par_do<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'Alice'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Bob'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Cameron'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Daniele'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Ellen'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">ParDo<\/span><span class=\"p\">(<\/span><span class=\"n\">ComputeWordLength<\/span><span class=\"p\">()))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u8fc7\u6ee4 &#8211; \u5bf9\u8981\u7d20\u8fdb\u884c\u7b5b\u9009<\/h2>\n<p>\u7b5b\u9009 PCollection \u7684\u5143\u7d20\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestFilter<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_filter<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'C'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Filter<\/span><span class=\"p\">(<\/span><span class=\"k\">lambda<\/span> <span class=\"n\">element<\/span><span class=\"p\">:<\/span> <span class=\"n\">element<\/span><span class=\"p\">.<\/span><span class=\"n\">startswith<\/span><span class=\"p\">(<\/span><span class=\"s\">'A'<\/span><span class=\"p\">)))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u5730\u56fe-\u5bf9\u8981\u7d20\u5e94\u7528\u51fd\u6570<\/h2>\n<p>\u5bf9 PCollection \u4e2d\u7684\u6bcf\u4e2a\u5143\u7d20\u5e94\u7528\u51fd\u6570\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestMap<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_map<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'Alice'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Bob'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Cameron'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Daniele'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Ellen'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Map<\/span><span class=\"p\">(<\/span><span class=\"k\">lambda<\/span> <span class=\"n\">element<\/span><span class=\"p\">:<\/span> <span class=\"nb\">len<\/span><span class=\"p\">(<\/span><span class=\"n\">element<\/span><span class=\"p\">)))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>FlatMap &#8211; \u5bf9\u5143\u7d20\u5e94\u7528\u51fd\u6570\uff08\u53ef\u8fed\u4ee3\uff09<\/h2>\n<p>\u5bf9 PCollection \u7684\u6bcf\u4e2a\u5143\u7d20\u5e94\u7528\u51fd\u6570\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestFlatMap<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_flat_map<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[[<\/span><span class=\"s\">'Alice'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Bob'<\/span><span class=\"p\">],<\/span> <span class=\"p\">[<\/span><span class=\"s\">'Cameron'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Daniele'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Ellen'<\/span><span class=\"p\">]]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">FlatMap<\/span><span class=\"p\">(<\/span><span class=\"k\">lambda<\/span> <span class=\"n\">element<\/span><span class=\"p\">:<\/span> <span class=\"p\">[<\/span><span class=\"nb\">len<\/span><span class=\"p\">(<\/span><span class=\"n\">e<\/span><span class=\"p\">)<\/span> <span class=\"k\">for<\/span> <span class=\"n\">e<\/span> <span class=\"ow\">in<\/span> <span class=\"n\">element<\/span><span class=\"p\">]))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>ToString &#8211; \u5c06\u8981\u7d20\u8f6c\u6362\u4e3a\u5b57\u7b26\u4e32<\/h2>\n<p>\u5c06PCollection\u4e2d\u7684\u6bcf\u4e2a\u5143\u7d20\u8f6c\u6362\u4e3a\u5b57\u7b26\u4e32\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestToString<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_to_string_kvs<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"s\">\"\"\"Key, Value \u3092 , \u533a\u5207\u308a\u306e\u6587\u5b57\u5217\u306b.\"\"\"<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A,B'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'C,D'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'C'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'D'<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">ToString<\/span><span class=\"p\">.<\/span><span class=\"n\">Kvs<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_to_string_element<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"s\">\"\"\"\u5404\u8981\u7d20\u3092\u6587\u5b57\u5217\u306b.\"\"\"<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">\"A\"<\/span><span class=\"p\">,<\/span> <span class=\"s\">\"['A', 'B']\"<\/span><span class=\"p\">,<\/span> <span class=\"s\">\"['C', 'D', 'E']\"<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">],<\/span> <span class=\"p\">[<\/span><span class=\"s\">'C'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'D'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'E'<\/span><span class=\"p\">]]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">ToString<\/span><span class=\"p\">.<\/span><span class=\"n\">Element<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_to_string_iterables<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"s\">\"\"\"\u30a4\u30c6\u30e9\u30d6\u30eb\u306a\u30aa\u30d6\u30b8\u30a7\u30af\u30c8\u3092\u6587\u5b57\u5217\u306b.\"\"\"<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A,B'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'C,D,E'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">],<\/span> <span class=\"p\">[<\/span><span class=\"s\">'C'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'D'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'E'<\/span><span class=\"p\">]]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">ToString<\/span><span class=\"p\">.<\/span><span class=\"n\">Iterables<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u4ece\u8981\u7d20\u4e2d\u63d0\u53d6\u952e<\/h2>\n<p>\u4ece PCollection \u7684\u6bcf\u4e2a\u5143\u7d20\uff08\u952e\u503c\u5bf9\uff09\u4e2d\u62bd\u53d6\u952e\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestKeys<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_keys<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">0<\/span><span class=\"p\">,<\/span> <span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"mi\">0<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Sunday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Monday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Tuesday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Wednesday'<\/span><span class=\"p\">),<\/span>\r\n                  <span class=\"p\">(<\/span><span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Thursday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Friday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Saturday'<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Keys<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u4ece\u8981\u7d20\u4e2d\u63d0\u53d6\u4ef7\u503c<\/h2>\n<p>\u4ece PCollection \u7684\u6bcf\u4e2a\u5143\u7d20\uff08\u952e\u548c\u503c\u5bf9\uff09\u4e2d\u63d0\u53d6\u503c\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestValues<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_values<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'Sunday'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Monday'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Tuesday'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Wednesday'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Thursday'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Friday'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Saturday'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"mi\">0<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Sunday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Monday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Tuesday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Wednesday'<\/span><span class=\"p\">),<\/span>\r\n                  <span class=\"p\">(<\/span><span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Thursday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Friday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Saturday'<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Values<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>KvSwap &#8211; \u4ea4\u6362\u5143\u7d20\u7684\u952e\u548c\u503c<\/h2>\n<p>\u5c06 PCollection \u4e2d\u7684\u6bcf\u4e2a\u5143\u7d20\uff08\u952e\u503c\u5bf9\uff09\u7684\u952e\u548c\u503c\u8fdb\u884c\u4e92\u6362\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestKvSwap<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_kv_swap<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'Friday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'Monday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">1<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'Saturday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'Sunday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">0<\/span><span class=\"p\">),<\/span>\r\n                    <span class=\"p\">(<\/span><span class=\"s\">'Thursday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'Tuesday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'Wednesday'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"mi\">0<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Sunday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Monday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Tuesday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Wednesday'<\/span><span class=\"p\">),<\/span>\r\n                  <span class=\"p\">(<\/span><span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Thursday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Friday'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"s\">'Saturday'<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">KvSwap<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h1>\u805a\u5408\u5904\u7406<\/h1>\n<h2>\u6309\u7167\u952e\uff08Key\uff09\u5bf9\u8981\u7d20\u8fdb\u884c\u5206\u7ec4\u805a\u5408 &#8211; GroupByKey<\/h2>\n<p>\u6309\u7167 Key \u5c06 PCollection \u4e2d\u7684\u5143\u7d20\uff08\u5305\u542b\u952e\u548c\u503c\u7684\u5bf9\uff09\u8fdb\u884c\u805a\u5408\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestGroupByKey<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_group_by_key<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'cat'<\/span><span class=\"p\">,<\/span> <span class=\"p\">[<\/span><span class=\"s\">'tama'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'mike'<\/span><span class=\"p\">]),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'dog'<\/span><span class=\"p\">,<\/span> <span class=\"p\">[<\/span><span class=\"s\">'pochi'<\/span><span class=\"p\">])]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'cat'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'tama'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'cat'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'mike'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'dog'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'pochi'<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">GroupByKey<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>CoGroupByKey &#8211; \u4f7f\u7528 Key \u5c06\u8981\u7d20\u805a\u5408\u5728\u4e00\u8d77\uff08\u591a\u4e2a PCollection\uff09<\/h2>\n<p>\u6309\u7167\u952e\uff08Key\uff09\u7684\u65b9\u5f0f\u5bf9\u591a\u4e2a PCollection \u7684\u5143\u7d20\uff08\u952e\u503c\u5bf9\uff09\u8fdb\u884c\u805a\u5408\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestCoGroupByKey<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_co_group_by_key<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span>\r\n            <span class=\"p\">(<\/span><span class=\"s\">'amy'<\/span><span class=\"p\">,<\/span> <span class=\"p\">([<\/span><span class=\"s\">'amy@example.com'<\/span><span class=\"p\">],<\/span> <span class=\"p\">[<\/span><span class=\"s\">'111-222-3333'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'333-444-5555'<\/span><span class=\"p\">])),<\/span>\r\n            <span class=\"p\">(<\/span><span class=\"s\">'julia'<\/span><span class=\"p\">,<\/span> <span class=\"p\">([<\/span><span class=\"s\">'julia@example.com'<\/span><span class=\"p\">],<\/span> <span class=\"p\">[]))<\/span>\r\n        <span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs1<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'amy'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'amy@example.com'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'julia'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'julia@example.com'<\/span><span class=\"p\">)]<\/span>\r\n        <span class=\"n\">inputs2<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'amy'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'111-222-3333'<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'amy'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'333-444-5555'<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">pcol1<\/span> <span class=\"o\">=<\/span> <span class=\"n\">p<\/span> <span class=\"o\">|<\/span> <span class=\"s\">'create pcol1'<\/span> <span class=\"o\">&amp;<\/span><span class=\"n\">gt<\/span><span class=\"p\">;<\/span><span class=\"o\">&amp;<\/span><span class=\"n\">gt<\/span><span class=\"p\">;<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs1<\/span><span class=\"p\">)<\/span>\r\n            <span class=\"n\">pcol2<\/span> <span class=\"o\">=<\/span> <span class=\"n\">p<\/span> <span class=\"o\">|<\/span> <span class=\"s\">'create pcol2'<\/span> <span class=\"o\">&amp;<\/span><span class=\"n\">gt<\/span><span class=\"p\">;<\/span><span class=\"o\">&amp;<\/span><span class=\"n\">gt<\/span><span class=\"p\">;<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs2<\/span><span class=\"p\">)<\/span>\r\n\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">((<\/span><span class=\"n\">pcol1<\/span><span class=\"p\">,<\/span> <span class=\"n\">pcol2<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">CoGroupByKey<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u5168\u7403\u5408\u5e76 &#8211; \u8981\u7d20\u7684\u7ed3\u5408<\/h2>\n<p>\u5c06 PCollection \u4e2d\u7684\u6240\u6709\u5143\u7d20\u5408\u5e76\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestCombineGlobally<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_combine_globally<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">55<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">CombineGlobally<\/span><span class=\"p\">(<\/span><span class=\"nb\">sum<\/span><span class=\"p\">))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u5c06\u8981\u7d20\u5b58\u50a8\u5230\u4e00\u4e2a\u5217\u8868\u4e2d<\/h2>\n<p>\u5c06 PCollection \u7684\u6240\u6709\u5143\u7d20\u5b58\u50a8\u5728\u4e00\u4e2a\u5217\u8868\u4e2d\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestToList<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_to_list<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">ToList<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u5c06\u8981\u7d20\u5b58\u50a8\u4e3a\u4e00\u4e2a\u5b57\u5178\u7c7b\u578b\uff0c\u4f7f\u7528ToDict(\uff09\u51fd\u6570\u3002<\/h2>\n<p>\u5c06 PCollection \u4e2d\u7684\u6240\u6709\u5143\u7d20\uff08\u952e\u548c\u503c\u5bf9\uff09\u5b58\u50a8\u5230\u4e00\u4e2a\u5b57\u5178\u7c7b\u578b\u4e2d\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestToDict<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_to_dict<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[{<\/span><span class=\"s\">'A'<\/span><span class=\"p\">:<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">:<\/span> <span class=\"mi\">1<\/span><span class=\"p\">}]<\/span>  <span class=\"c1\"># Key \u304c\u88ab\u308b\u5834\u5408\u306f\u3069\u3061\u3089\u304b\u4e00\u65b9\u306e Value \u304c\u9078\u629e\u3055\u308c\u308b\r\n<\/span>\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[(<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">1<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">),<\/span> <span class=\"p\">(<\/span><span class=\"s\">'B'<\/span><span class=\"p\">,<\/span> <span class=\"mi\">1<\/span><span class=\"p\">)]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">ToDict<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u8ba1\u7b97 &#8211; \u7edf\u8ba1\u8981\u7d20\u7684\u6570\u91cf<\/h2>\n<p>\u8ba1\u7b97 PCollection \u7684\u5143\u7d20\u6570\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestCount<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_count<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">Count<\/span><span class=\"p\">.<\/span><span class=\"n\">Globally<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u533a\u522b\u660e\u663e &#8211; \u9664\u53bb\u91cd\u590d\u8981\u7d20<\/h2>\n<p>\u4ece PCollection \u7684\u5143\u7d20\u4e2d\u6392\u9664\u91cd\u590d\u9879\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestDistinct<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_distinct<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Distinct<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u8ba1\u7b97\u8981\u7d20\u7684\u5e73\u5747\u503c<\/h2>\n<p>\u8ba1\u7b97PCollection\u4e2d\u6240\u6709\u5143\u7d20\u7684\u5e73\u5747\u503c\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestMean<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_mean<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mf\">5.5<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">Mean<\/span><span class=\"p\">.<\/span><span class=\"n\">Globally<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u6837\u672c &#8211; \u4ece\u8981\u7d20\u4e2d\u968f\u673a\u62bd\u53d6<\/h2>\n<p>\u4ece PCollection \u4e2d\u968f\u673a\u62bd\u53d6\u82e5\u5e72\u9879\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestSample<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_sample<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[[<\/span><span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">]]<\/span>  <span class=\"c1\"># \u671f\u5f85\u5024\u306f\u6bce\u56de\u30e9\u30f3\u30c0\u30e0\u306a\u5024\u306b\u306a\u308b\r\n<\/span>\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">Sample<\/span><span class=\"p\">.<\/span><span class=\"n\">FixedSizeGlobally<\/span><span class=\"p\">(<\/span><span class=\"mi\">3<\/span><span class=\"p\">))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u9876\u90e8-\u4ece\u8981\u7d20\u4e2d\u63d0\u53d6\u6700\u5927\uff08\u6216\u6700\u5c0f\uff09\u503c<\/h2>\n<p>\u4ecePCollection\u7684\u6240\u6709\u5143\u7d20\u4e2d\u63d0\u53d6\u51fa\u6700\u5927\uff08\u6216\u6700\u5c0f\uff09\u7684\u6570\u4e2a\u5143\u7d20\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestTop<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_top_largest<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[[<\/span><span class=\"mi\">10<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">]]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">Top<\/span><span class=\"p\">.<\/span><span class=\"n\">Largest<\/span><span class=\"p\">(<\/span><span class=\"mi\">3<\/span><span class=\"p\">))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_top_smallest<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">]]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">combiners<\/span><span class=\"p\">.<\/span><span class=\"n\">Top<\/span><span class=\"p\">.<\/span><span class=\"n\">Smallest<\/span><span class=\"p\">(<\/span><span class=\"mi\">3<\/span><span class=\"p\">))<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h1>\u5176\u4ed6\u5904\u7406 | Others<\/h1>\n<h2>\u5408\u5e76 &#8211; PCollection \u7684\u6574\u7406<\/h2>\n<p>\u5c06\u591a\u4e2a PCollection \u5408\u5e76\u6210\u4e00\u4e2a PCollection\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestFlatten<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_flatten<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">,<\/span> <span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs1<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">1<\/span><span class=\"p\">,<\/span> <span class=\"mi\">2<\/span><span class=\"p\">,<\/span> <span class=\"mi\">3<\/span><span class=\"p\">,<\/span> <span class=\"mi\">4<\/span><span class=\"p\">,<\/span> <span class=\"mi\">5<\/span><span class=\"p\">]<\/span>\r\n        <span class=\"n\">inputs2<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"mi\">6<\/span><span class=\"p\">,<\/span> <span class=\"mi\">7<\/span><span class=\"p\">,<\/span> <span class=\"mi\">8<\/span><span class=\"p\">,<\/span> <span class=\"mi\">9<\/span><span class=\"p\">,<\/span> <span class=\"mi\">10<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">pcol1<\/span> <span class=\"o\">=<\/span> <span class=\"n\">p<\/span> <span class=\"o\">|<\/span> <span class=\"s\">'create pcol1'<\/span> <span class=\"o\">&gt;&gt;<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs1<\/span><span class=\"p\">)<\/span>\r\n            <span class=\"n\">pcol2<\/span> <span class=\"o\">=<\/span> <span class=\"n\">p<\/span> <span class=\"o\">|<\/span> <span class=\"s\">'create pcol2'<\/span> <span class=\"o\">&gt;&gt;<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs2<\/span><span class=\"p\">)<\/span>\r\n\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">pcol1<\/span><span class=\"p\">,<\/span> <span class=\"n\">pcol2<\/span><span class=\"p\">)<\/span> <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Flatten<\/span><span class=\"p\">()<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h2>\u8c03\u6574 &#8211; \u8981\u7d20\u7684\u91cd\u65b0\u5206\u914d<\/h2>\n<p>\u5728\u5de5\u4f5c\u673a\u4e4b\u95f4\u91cd\u65b0\u5206\u914d PCollection \u7684\u5143\u7d20\u3002<\/p>\n<pre class=\"post-pre\"><code><span class=\"kn\">from<\/span> <span class=\"nn\">unittest<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestCase<\/span>\r\n\r\n<span class=\"kn\">import<\/span> <span class=\"nn\">apache_beam<\/span> <span class=\"k\">as<\/span> <span class=\"n\">beam<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.test_pipeline<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">TestPipeline<\/span>\r\n<span class=\"kn\">from<\/span> <span class=\"nn\">apache_beam.testing.util<\/span> <span class=\"kn\">import<\/span> <span class=\"n\">assert_that<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span>\r\n\r\n\r\n<span class=\"k\">class<\/span> <span class=\"nc\">TestReshuffle<\/span><span class=\"p\">(<\/span><span class=\"n\">TestCase<\/span><span class=\"p\">):<\/span>\r\n\r\n    <span class=\"k\">def<\/span> <span class=\"nf\">test_reshuffle<\/span><span class=\"p\">(<\/span><span class=\"bp\">self<\/span><span class=\"p\">):<\/span>\r\n        <span class=\"n\">expected<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'C'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"n\">inputs<\/span> <span class=\"o\">=<\/span> <span class=\"p\">[<\/span><span class=\"s\">'A'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'B'<\/span><span class=\"p\">,<\/span> <span class=\"s\">'C'<\/span><span class=\"p\">]<\/span>\r\n\r\n        <span class=\"k\">with<\/span> <span class=\"n\">TestPipeline<\/span><span class=\"p\">()<\/span> <span class=\"k\">as<\/span> <span class=\"n\">p<\/span><span class=\"p\">:<\/span>\r\n            <span class=\"n\">actual<\/span> <span class=\"o\">=<\/span> <span class=\"p\">(<\/span><span class=\"n\">p<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Create<\/span><span class=\"p\">(<\/span><span class=\"n\">inputs<\/span><span class=\"p\">)<\/span>\r\n                      <span class=\"o\">|<\/span> <span class=\"n\">beam<\/span><span class=\"p\">.<\/span><span class=\"n\">Reshuffle<\/span><span class=\"p\">())<\/span>\r\n\r\n            <span class=\"n\">assert_that<\/span><span class=\"p\">(<\/span><span class=\"n\">actual<\/span><span class=\"p\">,<\/span> <span class=\"n\">equal_to<\/span><span class=\"p\">(<\/span><span class=\"n\">expected<\/span><span class=\"p\">))<\/span>\r\n<\/code><\/pre>\n<h1>\u603b\u7ed3<\/h1>\n<p>\u5728Apache Beam\u7684Python SDK\u4e2d\uff0c\u63d0\u4f9b\u4e86\u4e30\u5bcc\u7684Transform\u9009\u9879\uff08\u76f8\u8f83\u4e8eJava\u6765\u8bf4\u8f83\u5c11\uff09\u3002\u6211\u4eec\u5c06\u968f\u65f6\u66f4\u65b0\u4ee5\u63d0\u4f9b\u65b0\u7684\u529f\u80fd\u3002<\/p>\n<p>\u5e0c\u671b\u4f60\u4eec\u80fd\u5728\u9700\u8981\u5feb\u901f\u56de\u60f3\u8d77 Apache Beam \u7684 Transform \u65f6\u53c2\u8003\u8fd9\u4e2a\u4fe1\u606f\uff01<\/p>\n<h1>\u8bf7\u53c2\u8003\u4ee5\u4e0b\u7f51\u5740<\/h1>\n<ul class=\"post-ul\">Python transform catalog<\/ul>\n","protected":false},"excerpt":{"rendered":"<p>\u9996\u5148 \u5728\u8fd9\u7bc7\u6587\u7ae0\u4e2d\uff0c\u603b\u7ed3\u4e86Apache Beam Python SDK\u63d0\u4f9b\u7684Transform\u3002\u901a\u8fc7\u4e86\u89e3\u53ef\u4ee5\u65b9 [&hellip;]<\/p>\n","protected":false},"author":5,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-36493","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"yoast_head":"<!-- This site is optimized with the Yoast SEO Premium plugin v21.5 (Yoast SEO v21.5) - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>Apache Beam\u901f\u67e5\u8868\u3010Python\u3011 - Blog - Silicon Cloud<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam\u901f\u67e5\u8868\u3010python\u3011\/\" \/>\n<meta property=\"og:locale\" content=\"zh_CN\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011\" \/>\n<meta property=\"og:description\" content=\"\u9996\u5148 \u5728\u8fd9\u7bc7\u6587\u7ae0\u4e2d\uff0c\u603b\u7ed3\u4e86Apache Beam Python SDK\u63d0\u4f9b\u7684Transform\u3002\u901a\u8fc7\u4e86\u89e3\u53ef\u4ee5\u65b9 [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam\u901f\u67e5\u8868\u3010python\u3011\/\" \/>\n<meta property=\"og:site_name\" content=\"Blog - Silicon Cloud\" \/>\n<meta property=\"article:published_time\" content=\"2023-02-11T05:19:30+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2024-04-29T12:05:52+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/cdn.silicloud.com\/blog-img\/blog\/img\/657d295737434c4406c4373c\/2-0.png\" \/>\n<meta name=\"author\" content=\"\u6e05, \u5b87\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:label1\" content=\"\u4f5c\u8005\" \/>\n\t<meta name=\"twitter:data1\" content=\"\u6e05, \u5b87\" \/>\n\t<meta name=\"twitter:label2\" content=\"\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4\" \/>\n\t<meta name=\"twitter:data2\" content=\"7 \u5206\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"WebPage\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/\",\"url\":\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/\",\"name\":\"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011 - Blog - Silicon Cloud\",\"isPartOf\":{\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#website\"},\"datePublished\":\"2023-02-11T05:19:30+00:00\",\"dateModified\":\"2024-04-29T12:05:52+00:00\",\"author\":{\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/1a6ecd3d914d22a5ac32791ffc1fbd8e\"},\"breadcrumb\":{\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/#breadcrumb\"},\"inLanguage\":\"zh-Hans\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/\"]}]},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"\u9996\u9875\",\"item\":\"https:\/\/www.silicloud.com\/zh\/blog\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#website\",\"url\":\"https:\/\/www.silicloud.com\/zh\/blog\/\",\"name\":\"Blog - Silicon Cloud\",\"description\":\"\",\"inLanguage\":\"zh-Hans\"},{\"@type\":\"Person\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/1a6ecd3d914d22a5ac32791ffc1fbd8e\",\"name\":\"\u6e05, \u5b87\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"zh-Hans\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/4b2016c18459a605fc469c7566608f5686491baa112d0871ee613f61b7210565?s=96&d=mm&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/4b2016c18459a605fc469c7566608f5686491baa112d0871ee613f61b7210565?s=96&d=mm&r=g\",\"caption\":\"\u6e05, \u5b87\"},\"url\":\"https:\/\/www.silicloud.com\/zh\/blog\/author\/qingyu\/\"},{\"@type\":\"ImageObject\",\"inLanguage\":\"zh-Hans\",\"@id\":\"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/#local-main-organization-logo\",\"url\":\"\",\"contentUrl\":\"\",\"caption\":\"Blog - Silicon Cloud\"}]}<\/script>\n<!-- \/ Yoast SEO Premium plugin. -->","yoast_head_json":{"title":"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011 - Blog - Silicon Cloud","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam\u901f\u67e5\u8868\u3010python\u3011\/","og_locale":"zh_CN","og_type":"article","og_title":"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011","og_description":"\u9996\u5148 \u5728\u8fd9\u7bc7\u6587\u7ae0\u4e2d\uff0c\u603b\u7ed3\u4e86Apache Beam Python SDK\u63d0\u4f9b\u7684Transform\u3002\u901a\u8fc7\u4e86\u89e3\u53ef\u4ee5\u65b9 [&hellip;]","og_url":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam\u901f\u67e5\u8868\u3010python\u3011\/","og_site_name":"Blog - Silicon Cloud","article_published_time":"2023-02-11T05:19:30+00:00","article_modified_time":"2024-04-29T12:05:52+00:00","og_image":[{"url":"https:\/\/cdn.silicloud.com\/blog-img\/blog\/img\/657d295737434c4406c4373c\/2-0.png"}],"author":"\u6e05, \u5b87","twitter_card":"summary_large_image","twitter_misc":{"\u4f5c\u8005":"\u6e05, \u5b87","\u9884\u8ba1\u9605\u8bfb\u65f6\u95f4":"7 \u5206"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"WebPage","@id":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/","url":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/","name":"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011 - Blog - Silicon Cloud","isPartOf":{"@id":"https:\/\/www.silicloud.com\/zh\/blog\/#website"},"datePublished":"2023-02-11T05:19:30+00:00","dateModified":"2024-04-29T12:05:52+00:00","author":{"@id":"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/1a6ecd3d914d22a5ac32791ffc1fbd8e"},"breadcrumb":{"@id":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/#breadcrumb"},"inLanguage":"zh-Hans","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/"]}]},{"@type":"BreadcrumbList","@id":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"\u9996\u9875","item":"https:\/\/www.silicloud.com\/zh\/blog\/"},{"@type":"ListItem","position":2,"name":"Apache Beam\u901f\u67e5\u8868\u3010Python\u3011"}]},{"@type":"WebSite","@id":"https:\/\/www.silicloud.com\/zh\/blog\/#website","url":"https:\/\/www.silicloud.com\/zh\/blog\/","name":"Blog - Silicon Cloud","description":"","inLanguage":"zh-Hans"},{"@type":"Person","@id":"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/1a6ecd3d914d22a5ac32791ffc1fbd8e","name":"\u6e05, \u5b87","image":{"@type":"ImageObject","inLanguage":"zh-Hans","@id":"https:\/\/www.silicloud.com\/zh\/blog\/#\/schema\/person\/image\/","url":"https:\/\/secure.gravatar.com\/avatar\/4b2016c18459a605fc469c7566608f5686491baa112d0871ee613f61b7210565?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/4b2016c18459a605fc469c7566608f5686491baa112d0871ee613f61b7210565?s=96&d=mm&r=g","caption":"\u6e05, \u5b87"},"url":"https:\/\/www.silicloud.com\/zh\/blog\/author\/qingyu\/"},{"@type":"ImageObject","inLanguage":"zh-Hans","@id":"https:\/\/www.silicloud.com\/zh\/blog\/apache-beam%e9%80%9f%e6%9f%a5%e8%a1%a8%e3%80%90python%e3%80%91\/#local-main-organization-logo","url":"","contentUrl":"","caption":"Blog - Silicon Cloud"}]}},"_links":{"self":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts\/36493","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/users\/5"}],"replies":[{"embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/comments?post=36493"}],"version-history":[{"count":2,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts\/36493\/revisions"}],"predecessor-version":[{"id":87364,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/posts\/36493\/revisions\/87364"}],"wp:attachment":[{"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/media?parent=36493"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/categories?post=36493"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.silicloud.com\/zh\/blog\/wp-json\/wp\/v2\/tags?post=36493"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}