尝试使用Python在Apache Beam上操作一下

我想尝试用Python 2.7来使用Apache Beam。

Apache Beam是谷歌在其云服务Dataflow中使用的框架。
它的特点是无论是批处理还是流处理,都可以使用相同的编写方式。
这次我们用Python编写了一个简单的程序并进行了执行。

需要安装 Apache Beam

    • 前提はPython 2.7。

Python 3系しかない人やもともとPythonを使っていない人は、Pythonの環境を用意します。

私はPython 3系のconda環境だけだったので、conda installでpython2.7環境を作りました。参考:Python2, Python3 を切り替えて jupyter notebook を使う

Virtualenvを用いたインストール方法はBeam本家ドキュメント通り。 Apache Beam Python SDK Quickstart
Google のGCPを使うならGCPドキュメント。Python を使用したクイックスタート

安装Apache Beam

    • Python 2.7の環境に入ってから下記を実行。

 

    • ちなみにPython 3.5だとcythonでエラーが出て終わります。

 

    わたしの環境だと
pip install apache-beam

试着写一个类似下方的程序。

    • Python用のプログラミングガイドを斜め読みして3個くらいソースをコピペして下記を作ってみた。

 

    • Apache Beam Programming Guide

 

    • 処理内容

シェークスピアの4行を入力とする
各行の文字数をlen関数でカウントする
文字数を標準出力に出して終わる(本来はDBか何かに書き出して終わるが今回は割愛)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# まずパイプラインを作る
p = beam.Pipeline(options=PipelineOptions())

# 1.配列をパイプラインの入力に設定(4行を入力とする)
lines = (p
       | beam.Create([
           'To be, or not to be: that is the question: ',
           'Whether \'tis nobler in the mind to suffer ',
           'The slings and arrows of outrageous fortune, ',
           'Or to take arms against a sea of troubles, ']))

# 2.変換処理として各行の文字列カウントを設定
word_lengths = lines | beam.Map(len)

# 3.最後に標準出力にカウント数を出力して終わる
class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    print(element)
p_end = word_lengths | beam.ParDo(ExtractWordsFn())
p.run()
    • Jupyterで実行した結果

Gistに公開

<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>
43
42
45
43

总结

    • かんたんではあるが一応これでPythonからApache Beamのバッチ処理が実行できた。インストールから実装、実行までだいたい1時間くらい。

 

    • 今後ストリーミング処理を実行したい。またSpark Streamingなどをエンジンとして利用できるようなので、それも試したい。

 

    上記をWindows上のBash on windows+Jupyterから実行できてハッピーだった。

其他参考资料

    • Apache Beam Python SDK Quickstart

 

    • Python を使用したクイックスタート

 

    Cloud Dataflow入門〜データ処理の実践
bannerAds