尝试使用Python在Apache Beam上操作一下
我想尝试用Python 2.7来使用Apache Beam。
Apache Beam是谷歌在其云服务Dataflow中使用的框架。
它的特点是无论是批处理还是流处理,都可以使用相同的编写方式。
这次我们用Python编写了一个简单的程序并进行了执行。
需要安装 Apache Beam
-
- 前提はPython 2.7。
Python 3系しかない人やもともとPythonを使っていない人は、Pythonの環境を用意します。
私はPython 3系のconda環境だけだったので、conda installでpython2.7環境を作りました。参考:Python2, Python3 を切り替えて jupyter notebook を使う
Virtualenvを用いたインストール方法はBeam本家ドキュメント通り。 Apache Beam Python SDK Quickstart
Google のGCPを使うならGCPドキュメント。Python を使用したクイックスタート
安装Apache Beam
-
- Python 2.7の環境に入ってから下記を実行。
-
- ちなみにPython 3.5だとcythonでエラーが出て終わります。
- わたしの環境だと
pip install apache-beam
试着写一个类似下方的程序。
-
- Python用のプログラミングガイドを斜め読みして3個くらいソースをコピペして下記を作ってみた。
-
- Apache Beam Programming Guide
-
- 処理内容
シェークスピアの4行を入力とする
各行の文字数をlen関数でカウントする
文字数を標準出力に出して終わる(本来はDBか何かに書き出して終わるが今回は割愛)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# まずパイプラインを作る
p = beam.Pipeline(options=PipelineOptions())
# 1.配列をパイプラインの入力に設定(4行を入力とする)
lines = (p
| beam.Create([
'To be, or not to be: that is the question: ',
'Whether \'tis nobler in the mind to suffer ',
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, ']))
# 2.変換処理として各行の文字列カウントを設定
word_lengths = lines | beam.Map(len)
# 3.最後に標準出力にカウント数を出力して終わる
class ExtractWordsFn(beam.DoFn):
def process(self, element):
print(element)
p_end = word_lengths | beam.ParDo(ExtractWordsFn())
p.run()
-
- Jupyterで実行した結果
Gistに公開
<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>
43
42
45
43
总结
-
- かんたんではあるが一応これでPythonからApache Beamのバッチ処理が実行できた。インストールから実装、実行までだいたい1時間くらい。
-
- 今後ストリーミング処理を実行したい。またSpark Streamingなどをエンジンとして利用できるようなので、それも試したい。
- 上記をWindows上のBash on windows+Jupyterから実行できてハッピーだった。
其他参考资料
-
- Apache Beam Python SDK Quickstart
-
- Python を使用したクイックスタート
- Cloud Dataflow入門〜データ処理の実践