Apache Beam速查表【Python】

首先

在这篇文章中,总结了Apache Beam Python SDK提供的Transform。通过了解可以方便调用的各种Transform,可以更快地制定实施策略。

undefined

逐个元素的处理

执行ParDo – DoFn

对于PCollection中的每个元素,请执行某些处理(DoFn)。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class ComputeWordLength(beam.DoFn):

    def __init__(self):
        super(ComputeWordLength, self).__init__()

    def process(self, element):
        yield len(element)


class TestParDo(TestCase):

    def test_par_do(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

过滤 – 对要素进行筛选

筛选 PCollection 的元素。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFilter(TestCase):

    def test_filter(self):
        expected = ['A']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Filter(lambda element: element.startswith('A')))

            assert_that(actual, equal_to(expected))

地图-对要素应用函数

对 PCollection 中的每个元素应用函数。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMap(TestCase):

    def test_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Map(lambda element: len(element)))

            assert_that(actual, equal_to(expected))

FlatMap – 对元素应用函数(可迭代)

对 PCollection 的每个元素应用函数。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatMap(TestCase):

    def test_flat_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.FlatMap(lambda element: [len(e) for e in element]))

            assert_that(actual, equal_to(expected))

ToString – 将要素转换为字符串

将PCollection中的每个元素转换为字符串。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToString(TestCase):

    def test_to_string_kvs(self):
        """Key, Value を , 区切りの文字列に."""
        expected = ['A,B', 'C,D']

        inputs = [('A', 'B'), ('C', 'D')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Kvs())

            assert_that(actual, equal_to(expected))

    def test_to_string_element(self):
        """各要素を文字列に."""
        expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]

        inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Element())

            assert_that(actual, equal_to(expected))

    def test_to_string_iterables(self):
        """イテラブルなオブジェクトを文字列に."""
        expected = ['A,B', 'C,D,E']

        inputs = [['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Iterables())

            assert_that(actual, equal_to(expected))

从要素中提取键

从 PCollection 的每个元素(键值对)中抽取键。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKeys(TestCase):

    def test_keys(self):
        expected = [0, 1, 2, 3, 4, 5, 6]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Keys())

            assert_that(actual, equal_to(expected))

从要素中提取价值

从 PCollection 的每个元素(键和值对)中提取值。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestValues(TestCase):

    def test_values(self):
        expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Values())

            assert_that(actual, equal_to(expected))

KvSwap – 交换元素的键和值

将 PCollection 中的每个元素(键值对)的键和值进行互换。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKvSwap(TestCase):

    def test_kv_swap(self):
        expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
                    ('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.KvSwap())

            assert_that(actual, equal_to(expected))

聚合处理

按照键(Key)对要素进行分组聚合 – GroupByKey

按照 Key 将 PCollection 中的元素(包含键和值的对)进行聚合。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestGroupByKey(TestCase):

    def test_group_by_key(self):
        expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]

        inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.GroupByKey())

            assert_that(actual, equal_to(expected))

CoGroupByKey – 使用 Key 将要素聚合在一起(多个 PCollection)

按照键(Key)的方式对多个 PCollection 的元素(键值对)进行聚合。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCoGroupByKey(TestCase):

    def test_co_group_by_key(self):
        expected = [
            ('amy', (['amy@example.com'], ['111-222-3333', '333-444-5555'])),
            ('julia', (['julia@example.com'], []))
        ]

        inputs1 = [('amy', 'amy@example.com'), ('julia', 'julia@example.com')]
        inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = ((pcol1, pcol2)
                      | beam.CoGroupByKey())

            assert_that(actual, equal_to(expected))

全球合并 – 要素的结合

将 PCollection 中的所有元素合并。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCombineGlobally(TestCase):

    def test_combine_globally(self):
        expected = [55]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.CombineGlobally(sum))

            assert_that(actual, equal_to(expected))

将要素存储到一个列表中

将 PCollection 的所有元素存储在一个列表中。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToList(TestCase):

    def test_to_list(self):
        expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToList())

            assert_that(actual, equal_to(expected))

将要素存储为一个字典类型,使用ToDict()函数。

将 PCollection 中的所有元素(键和值对)存储到一个字典类型中。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToDict(TestCase):

    def test_to_dict(self):
        expected = [{'A': 2, 'B': 1}]  # Key が被る場合はどちらか一方の Value が選択される

        inputs = [('A', 1), ('A', 2), ('B', 1)]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToDict())

            assert_that(actual, equal_to(expected))

计算 – 统计要素的数量

计算 PCollection 的元素数。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCount(TestCase):

    def test_count(self):
        expected = [10]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Count.Globally())

            assert_that(actual, equal_to(expected))

区别明显 – 除去重复要素

从 PCollection 的元素中排除重复项。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestDistinct(TestCase):

    def test_distinct(self):
        expected = [1, 2, 3]

        inputs = [1, 1, 2, 3]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Distinct())

            assert_that(actual, equal_to(expected))

计算要素的平均值

计算PCollection中所有元素的平均值。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMean(TestCase):

    def test_mean(self):
        expected = [5.5]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Mean.Globally())

            assert_that(actual, equal_to(expected))

样本 – 从要素中随机抽取

从 PCollection 中随机抽取若干项。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestSample(TestCase):

    def test_sample(self):
        expected = [[2, 8, 6]]  # 期待値は毎回ランダムな値になる

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Sample.FixedSizeGlobally(3))

            assert_that(actual, equal_to(expected))

顶部-从要素中提取最大(或最小)值

从PCollection的所有元素中提取出最大(或最小)的数个元素。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestTop(TestCase):

    def test_top_largest(self):
        expected = [[10, 9, 8]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Largest(3))

            assert_that(actual, equal_to(expected))

    def test_top_smallest(self):
        expected = [[1, 2, 3]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Smallest(3))

            assert_that(actual, equal_to(expected))

其他处理 | Others

合并 – PCollection 的整理

将多个 PCollection 合并成一个 PCollection。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatten(TestCase):

    def test_flatten(self):
        expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        inputs1 = [1, 2, 3, 4, 5]
        inputs2 = [6, 7, 8, 9, 10]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = (pcol1, pcol2) | beam.Flatten()

            assert_that(actual, equal_to(expected))

调整 – 要素的重新分配

在工作机之间重新分配 PCollection 的元素。

from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestReshuffle(TestCase):

    def test_reshuffle(self):
        expected = ['A', 'B', 'C']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Reshuffle())

            assert_that(actual, equal_to(expected))

总结

在Apache Beam的Python SDK中,提供了丰富的Transform选项(相较于Java来说较少)。我们将随时更新以提供新的功能。

希望你们能在需要快速回想起 Apache Beam 的 Transform 时参考这个信息!

请参考以下网址

    Python transform catalog
广告
将在 10 秒后关闭
bannerAds