Apache Beam速查表【Python】
首先
在这篇文章中,总结了Apache Beam Python SDK提供的Transform。通过了解可以方便调用的各种Transform,可以更快地制定实施策略。

逐个元素的处理
执行ParDo – DoFn
对于PCollection中的每个元素,请执行某些处理(DoFn)。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class ComputeWordLength(beam.DoFn):
def __init__(self):
super(ComputeWordLength, self).__init__()
def process(self, element):
yield len(element)
class TestParDo(TestCase):
def test_par_do(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
过滤 – 对要素进行筛选
筛选 PCollection 的元素。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFilter(TestCase):
def test_filter(self):
expected = ['A']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Filter(lambda element: element.startswith('A')))
assert_that(actual, equal_to(expected))
地图-对要素应用函数
对 PCollection 中的每个元素应用函数。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMap(TestCase):
def test_map(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Map(lambda element: len(element)))
assert_that(actual, equal_to(expected))
FlatMap – 对元素应用函数(可迭代)
对 PCollection 的每个元素应用函数。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatMap(TestCase):
def test_flat_map(self):
expected = [5, 3, 7, 7, 5]
inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.FlatMap(lambda element: [len(e) for e in element]))
assert_that(actual, equal_to(expected))
ToString – 将要素转换为字符串
将PCollection中的每个元素转换为字符串。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToString(TestCase):
def test_to_string_kvs(self):
"""Key, Value を , 区切りの文字列に."""
expected = ['A,B', 'C,D']
inputs = [('A', 'B'), ('C', 'D')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Kvs())
assert_that(actual, equal_to(expected))
def test_to_string_element(self):
"""各要素を文字列に."""
expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]
inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Element())
assert_that(actual, equal_to(expected))
def test_to_string_iterables(self):
"""イテラブルなオブジェクトを文字列に."""
expected = ['A,B', 'C,D,E']
inputs = [['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Iterables())
assert_that(actual, equal_to(expected))
从要素中提取键
从 PCollection 的每个元素(键值对)中抽取键。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKeys(TestCase):
def test_keys(self):
expected = [0, 1, 2, 3, 4, 5, 6]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Keys())
assert_that(actual, equal_to(expected))
从要素中提取价值
从 PCollection 的每个元素(键和值对)中提取值。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestValues(TestCase):
def test_values(self):
expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Values())
assert_that(actual, equal_to(expected))
KvSwap – 交换元素的键和值
将 PCollection 中的每个元素(键值对)的键和值进行互换。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKvSwap(TestCase):
def test_kv_swap(self):
expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.KvSwap())
assert_that(actual, equal_to(expected))
聚合处理
按照键(Key)对要素进行分组聚合 – GroupByKey
按照 Key 将 PCollection 中的元素(包含键和值的对)进行聚合。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestGroupByKey(TestCase):
def test_group_by_key(self):
expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]
inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.GroupByKey())
assert_that(actual, equal_to(expected))
CoGroupByKey – 使用 Key 将要素聚合在一起(多个 PCollection)
按照键(Key)的方式对多个 PCollection 的元素(键值对)进行聚合。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCoGroupByKey(TestCase):
def test_co_group_by_key(self):
expected = [
('amy', (['amy@example.com'], ['111-222-3333', '333-444-5555'])),
('julia', (['julia@example.com'], []))
]
inputs1 = [('amy', 'amy@example.com'), ('julia', 'julia@example.com')]
inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = ((pcol1, pcol2)
| beam.CoGroupByKey())
assert_that(actual, equal_to(expected))
全球合并 – 要素的结合
将 PCollection 中的所有元素合并。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCombineGlobally(TestCase):
def test_combine_globally(self):
expected = [55]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.CombineGlobally(sum))
assert_that(actual, equal_to(expected))
将要素存储到一个列表中
将 PCollection 的所有元素存储在一个列表中。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToList(TestCase):
def test_to_list(self):
expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToList())
assert_that(actual, equal_to(expected))
将要素存储为一个字典类型,使用ToDict()函数。
将 PCollection 中的所有元素(键和值对)存储到一个字典类型中。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToDict(TestCase):
def test_to_dict(self):
expected = [{'A': 2, 'B': 1}] # Key が被る場合はどちらか一方の Value が選択される
inputs = [('A', 1), ('A', 2), ('B', 1)]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToDict())
assert_that(actual, equal_to(expected))
计算 – 统计要素的数量
计算 PCollection 的元素数。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCount(TestCase):
def test_count(self):
expected = [10]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Count.Globally())
assert_that(actual, equal_to(expected))
区别明显 – 除去重复要素
从 PCollection 的元素中排除重复项。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestDistinct(TestCase):
def test_distinct(self):
expected = [1, 2, 3]
inputs = [1, 1, 2, 3]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Distinct())
assert_that(actual, equal_to(expected))
计算要素的平均值
计算PCollection中所有元素的平均值。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMean(TestCase):
def test_mean(self):
expected = [5.5]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Mean.Globally())
assert_that(actual, equal_to(expected))
样本 – 从要素中随机抽取
从 PCollection 中随机抽取若干项。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestSample(TestCase):
def test_sample(self):
expected = [[2, 8, 6]] # 期待値は毎回ランダムな値になる
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Sample.FixedSizeGlobally(3))
assert_that(actual, equal_to(expected))
顶部-从要素中提取最大(或最小)值
从PCollection的所有元素中提取出最大(或最小)的数个元素。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestTop(TestCase):
def test_top_largest(self):
expected = [[10, 9, 8]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Largest(3))
assert_that(actual, equal_to(expected))
def test_top_smallest(self):
expected = [[1, 2, 3]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Smallest(3))
assert_that(actual, equal_to(expected))
其他处理 | Others
合并 – PCollection 的整理
将多个 PCollection 合并成一个 PCollection。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatten(TestCase):
def test_flatten(self):
expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
inputs1 = [1, 2, 3, 4, 5]
inputs2 = [6, 7, 8, 9, 10]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = (pcol1, pcol2) | beam.Flatten()
assert_that(actual, equal_to(expected))
调整 – 要素的重新分配
在工作机之间重新分配 PCollection 的元素。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestReshuffle(TestCase):
def test_reshuffle(self):
expected = ['A', 'B', 'C']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Reshuffle())
assert_that(actual, equal_to(expected))
总结
在Apache Beam的Python SDK中,提供了丰富的Transform选项(相较于Java来说较少)。我们将随时更新以提供新的功能。
希望你们能在需要快速回想起 Apache Beam 的 Transform 时参考这个信息!
请参考以下网址
- Python transform catalog