Apache Beamでデータ処理パイプラインを定義する方法は何ですか?
Apache Beam内でデータ処理パイプラインを定義するには、1つまたは複数のTransform関数を書くことで実現できます。以下はApache Beam内で簡単なデータ処理パイプラインを定義する方法を示す簡単な例です。
- 必要なライブラリをインポートしてください。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
- データを処理するTransform関数を定義してください。
class SplitWords(beam.DoFn):
def process(self, element):
return element.split(',')
- Pipelineオブジェクトを作成し、Transform関数を適用する。
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
lines = p | beam.Create(['hello,world', 'foo,bar'])
word_lists = lines | beam.ParDo(SplitWords())
例文では、SplitWordsクラスを作成して、Transform関数を定義しました。この関数は入力された文字列をコンマで区切って単語リストにします。そして、Create関数を使用して入力PCollectionを作成し、それをSplitWords関数に適用して、最終的に出力PCollection word_listsを生成しました。
自作のTransform関数を書き、それを入力PCollectionに適用することで、完全なデータ処理パイプラインを定義できます。Beamはこのパイプラインを自動的に実行可能な分散ジョブに変換し、分散計算フレームワーク上で実行します。