阅读了《数据驱动应用设计》第三部分(导出数据)
因为我读过了,所以我会留下一些个人笔记。
这些笔记并不详尽。
-
- 1~4章「データ指向アプリケーションデザイン」第 I 部 (データシステムの基礎) を読んだ
5~9章「データ指向アプリケーションデザイン」第 II 部 (分散データ) を読んだ
? 10~12章「データ指向アプリケーションデザイン」第 III 部 (導出データ) を読んだ
在第Ⅲ部,我们将合并多个数据系统群,以验证在创建一致的应用程序架构时会面临的问题。
-
- データストア
-
- インデックス
-
- キャッシュ
- 分析システム
数据存储和处理系统可分为两种类型。
-
- 記録のシステム (Systems of record)
source of truth
正規化されている。
導入データシステム
= 他の既存データを変換・処理して得られた結果。
例えばキャッシュ、非正規化された値、インデックス、マテリアライズドビュー。
例えば読み取りのクエリのパフォーマンスを良くする。
第10章的批处理
在本章中,我们将讨论MapReduce以及一些批处理算法和框架。
使用UNIX工具进行批处理的比较参考
将awk、sed、grep、sort、uniq、xargs等命令结合使用。
因为在命令的链式操作下,例如可以非常容易地分析日志文件。
-
- 一様なインターフェース
入力、出力ともにファイルディスクリプタ
ロジックと結線の分離
= プログラムは入力がどこから来てどこへ行くのか気にしない
必要なのはstdinから入力を読み込み、stdoutへ書き出すこと
透過性があり実験しやすい
Unixコマンドへの入力ファイルはイミュータブル(書き換えられない)
パイプラインを好きな場所で終わらせてlessにパイプして出力を確認できる
MapReduce是一种数据处理模型和编程框架。
Unix工具组的最大限制是它们只能在一台计算机上运行。
使用类似Hadoop的工具,可以将其扩展到多台计算机上。
截至2019年,最大的HDFS环境在数万台机器上运行,总存储容量为数百个拍字节。
-
- 1つのMapReduceジョブ = 1つのmapper処理 ->1つのreduce処理
mapper処理のタスク数=入力ディレクトリ内のファイルブロック数
reducer処理のタスク数=ジョブの作成者が設定
コールバック関数(mapper, reducer)は状態を持たず、指定された出力以外に副作用を外部に及ぼさない
=MapReduceジョブの実行は、出力生成以外の副作用はない
=タスクをリトライしても、ジョブの最終出力はフォールトが生じなかった場合と同じになる
?️バッチ処理のジョブコードで耐障害性のための仕組みの実装を気にしなくて良い
また、バッチ処理のメンテナンスも容易になる
MapReduceジョブは、分散ファイルシステム上のファイルを読み書きする
Hadoopでは、HDFSのディレクトリがジョブの入力。その各ファイル/ファイルブロックが個別のmapタスクで処理されるパーティション。
HDFSでは、はネームノード(中央サーバー)が各マシンのデーモンプロセスを管理
耐障害性のために、ファイルブロックは複数マシンにレプリケーションされる
ジョブの実行は
入力ファイル群をレコードに分割する
mapper関数を呼び、各入力レコードをキーバリューペア(複数キーも可能)に変換する。
reducer関数により、ソート順を保ちながらそれらをソートする。
mapper関数の出力がreducer関数の入力に渡される場合、暗黙のうちにソートされる。これは(#mapper, #reducer)=(3, 2)の場合、以下で実現される。
mapperの出力はm1, m2, m3ノードのそれぞれのディスク上で、[r1, r2] にパーティショニングされる。これにはキーのハッシュ値を利用。
mapperの出力完了後に、スケジューラが各reducer群に出力ファイルのフェチが可能になったことを伝達する。
reducerは各m1, m2, m3ノードのそれぞれから、自分のノードのパーティションに相当する出力を取得する。
ジョブの依存関係を明示的に指定するために、ワークフロースケジューラー(Luigi, Airflow, Pinball, etc)が使用できる。
在工作流程中,有时候希望对事实表(例如,点击事件)的每条记录进行连接,以找到与之相关的维度表(例如,用户)的记录,并得出有价值的结果。
作为JOIN的方法,
-
- reducer側で行う
ETLを使用してdimensionテーブルのコピーをfact tableと同じ分散ファイルシステム上におく。
ソートマージ結合
これは「同じキーを持つレコードを同じ場所に持ってくる」パターン?
mapper1でfact tableのレコードが処理後、JOINしたいキーによってパーティションする
mapper2でdimension tableのレコードをmapper1と同じキーでパーティションする
?️ reducerでは、(keyの順番, それに対応するdimension record -> fact records) の順番にソートされる。=secondary sort。これによりreducerは単一ノード上でJOINのロジックを容易く実行できる。
mapper側で行う
ブロードキャストハッシュ結合
mapperのみ使用、reducerは使わない
それぞれのmapperが起動時に、JOIN対象のdimension tableをインメモリのハッシュテーブルにロードする。これを使いmapperの処理のなかでJOINを行う。
dimension tableが、各メモリのメモリに収まるくらい小規模な場合に使用可能
パーティション化ハッシュ結合
入力の時点でdimension tableのレコード群と、fact tableのレコード群を、同じキーでパーティションする
-> joinしたいレコード群は全て同じパーティション内にある
いずれかのレコード群をハッシュとしてメモリ上に保持する
map側マージ結合
↑で、同一キーによるパーティション化のみならず、そのキー内でのソートも行われる。
マージソートを利用できるので、いずれのレコード群もメモリにロードしなくても済む
⚠️在”将具有相同键的记录放置在同一位置?”模式下,可能会产生偏斜(即热点)。
作为这项缓解措施
-
- skewed joinメソッド(Pigなど)
- 2ステージでグループ化を行う(Hiveなど)
? 手工制作工作流程的输出示例
-
- 検索エンジン用のインデックス(昔のGoogle)
-
- 機械学習システムのclassifier
-
- レコメンデーションシステム
これらはバッチジョブで内部作成したデータファイルに書き込み -> アプリケーション用の読み取り専用のDBへバルクロードすべし。
效能更高。
-
- ストレージの多様性
Hadoopでは事前に決めた特定のデータモデルに従う必要はない。
処理モデルの多様性
=自分の作成したコードを大規模なデータセットに対して容易に実行できる柔軟性がある。
障害対策に敏感でない
副作用がないため、個々のタスクの粒度で処理のリトライが可能であるため
非常に頻繁にディスク書き込みが行われる
データセットがメモリに収めるには大きすぎるため
(Javaなど)言語の既存ライブラリを利用し任意のコードで処理を書くことができる
?MapReduce实体化中间输出(即将其写入文件)。由于需要等待所有先前作业的任务全部完成,因此工作流的执行速度会降低。而Unix管道则通过使用内存缓冲区,逐步地进行流式处理而不完全实体化中间状态。
这些问题在数据流引擎(如Hadoop上的Spark、Tez、Flink等)中得到了解决,而且速度很快。
-
- mapper, reducerの代わりにoperatorを指定する -> 処理を柔軟に書ける
-
- mapとreduceのステージ間で行われていた負荷の高いソートをデフォルトでは行わない
-
- ワークフロー内の全てのJOINとデータの依存関係は明示的に示される
-
- オペレータ間の中間状態はメモリかローカルのディスクに保存される。HDFSへは書き込まない
障害発生により中間データを喪失した場合には、再計算を行う。(計算結果にランダム性がなく決定的であることが必要)
ただし、最初の入力と最終的な出力先はHDFS。
此外,使用Pig、Hive或Cascading实现的工作流可以通过仅仅修改简单的配置而无需修改代码,实现从MapReduce转换到Tez或Spark的切换。
用Pregel模型来并行化(即递归处理)图算法,我们可以使用Apache Giraph、Spark的GraphX API以及Flink的Gelly API来实现。
Hive和Presto都是在Hadoop上运行的查询引擎。你可以将orc和perquet文件作为查询的目标格式进行指定。
第11章 流式处理
批处理适用于有限的输入数据;而流处理适用于无限的输入数据。
在流处理中,会频繁执行处理操作(例如每秒一次),并且会连续进行操作,每当事件发生时。
在流媒体中,生成和消费由多个生产者和消费者组成的发布/订阅模型是适用的。
-
- Producerのメッセージ送信速度にConsumerの消費速度が追い付かない場合の対策は
システムがメッセージをドロップする
キューにメッセージをバッファリングする = メッセージブローカー
バックプレッシャーによりフロー制御を行う(例えばProducerが送信を行えないようブロックする)
耐障害性が必要かはアプリケーションに強く依存する。必要な場合、
プロデューサーからコンシューマーへ直接メッセージングする。方法としては
UDPマルチキャストを使用する
WebhookとしてProducerがConsumerへのリクエストを行う
ConsumerがProsucerへのリクエストを行う
メッセージブローカを使用する
=メッセージストリームの扱いに最適化されたDB
消息代理对于流式处理非常有用。
消息代理器是一种软件或服务,用于在不同应用程序和系统之间传递和传递消息。
-
- 永続性の問題をアプリケーションから引き剥がし担当する
- コンシューマの処理を非同期にできる
以下是对消息代理的说明。有两种主要类型。
-
- 旧来型のJMS/AMQP標準のブローカー
新しいメッセージを受信時、コンシューマへ通知する -> コンシュマーが処理を完了し承認(ack)を送信 -> ブローカーはそのメッセージを削除する
= push型?
?ブローカーはメッセージを自動的に削除するのでキューは短い前提。
◉メモリにメッセージ群が収まらない場合にのみディスクに書き出す
同一トピックを読み取るコンシューマーのパターンを二つ紹介する。
ロードバランシング
一つの処理を並列化できる
ファンアウト
コンシューマーの処理は独立している。
ブローカーは承認を得られなかった場合、他のコンシューマにそのメッセージを再送信する
?ロードバランシングで再送信が行われるとメッセージの順序が入れ替わることがある?
メッセージ処理の負担が大きいかもしれず(なぜ?)、メッセージ単位で処理を並列化したく(ログベースでも可能では?)、メッセージの順番はそれほど重要でない場合に適している
e.g. RabbitMQ, ActiveMQ, Google Cloud Pub/Subなどでサポートされている
ログベースメッセージブローカー
◉メッセージはディスク上にログとして書き出される
ログをパーティション化することでスループットを高められる。(数百万メッセージ毎秒まで)
ブローカーが各パーティション内で単調増加するシーケンス番号をメッセージに振り当てる。各コンシューマーはオフセットとしてこれを保持する。(一つのコンシューマクライアントが複数のパーティションから読み取ることもある)
?単一パーティション内ではメッセージの順序が保証される?
ブローカーは各コンシューマーグループへメッセージをファンアウトし、各グループ内のクライアントたち(1スレッド1クライアント)にメッセージをロードバランシングできる。
コンシューマーは処理済みのオフセットを前進させてどこまで読み取ったかを管理する
ログはリングバッファ(循環バッファ)として全てディスク上に保存され、通常数日から数週間分のメッセージを保存できる。
コンシューマの読み取りが大きく遅れている場合には、ブローカーが警告を発する。これを人間の運用担当者が確認し、低速なコンシューマを修正する。
メッセージ処理のスループットが高くなければならず、一つ一つのメッセージ処理は高速に行え(なぜ?)、メッセージの順番が重要である場合に適している
e.g. Apache Kafka, Amazon Kinesis Stream
截至2019年,一般大型高清硬盘的容量为6TB,通过顺序写入的吞吐量为150MB/s -> 如果以最大速度写入,将在11个小时内填满。
利用流式处理进行设计
迄今为止,我们介绍了对于引入应用程序非常有用的消息代理技术。
在本节中,我们介绍了将流处理引入应用程序的架构。
在异构数据系统(使用多种不同技术组合)中产生的问题是,
-
- [並行性の問題] 二重書き込み(dual writes)
記録のレコードを二つのクライアントが順に更新したとする。導出データシステム内の相当レコードの更新が逆順に行われた場合、二つのデータソース間でデータ不整合が発生する。
[耐障害性の問題] 記録のレコードは成功するが、導出データシステムの更新は失敗する
要はアトミックに更新されていない。
为了解决这些问题,将事件流的概念引入系统中。
-
- 変更データキャプチャ(CDC)
記録のレコード(leaderと見なせる)への更新をストリームとして導出データシステム(followerと見なせる)が利用する。
記録のシステムがリーダー、導入データシステムをフォロワーと捉えることができる
実装方法は①データベーストリガーを利用したり、②レプリケーションログを自前でパースしたり、など。
Kafkaなどではログコンパクションがサポートされている。これは各レコードの最新バージョン以外を破棄できる。
近年のDBでは第1級のインターフェイスとしてサポートされるようになってきている。
イベントソーシング
= システムが全ての生のイベントを恒久的に保存でき、その完全なイベントログを必要に応じて再処理できるようにすること。DBのレコードの更新や削除は禁止。
この用途に特化したDBの例は、Event Storeなど。
这些是 / 这些东西是 / 以上都是
-
- データを書き込む形式から、データを読み込む形式を分離することができる。(クエリ責務分離)
書き込みはイベントをあるがままに書き込む。
読み取り用に最適化したビューを構築する。用途に合わせてその都度ビューを構築することで、既存のシステムの修正が不要になる。(これはスキーマのマイグレーションより容易である。)
メリットは、リカバリーの容易性と過去イベントの分析が可能なこと。
欠点は以下二つ。
(1) イベントログのコンシューマが非同期であることにより、イベントログの生成後、導出データシステムの読み取り時に自分の書き込みが反映されていないことがあること。(= read after write が保障されない) ?対処法は
そもそもイベント書き込み時に、ビューの更新を同期的に行う。このためには、イベントログと読み取りのビューを同じストレージシステムに作成するか、分散トランザクションを使用する
ビューを使用せず、現在の状態を毎回イベントログから算出する。
(2) 複数のイベントログが作成され、両方がビューの同一レコードを更新するとする。この際にビューへの反映の順番が反転し、ビューがイベントログから算出されるものと異なってしまう。
あるビューのレコードを更新しうるイベントログをメッセージブローカのただ一つのパーティションにのみ蓄積していく。(シングルスレッドの)ログのコンシューマはこのパーティションを読み取る。
イミュータブルにログを保持するが、例えば以下の場合には管理上データを削除する必要が発生する。
プライバシーの規定から退会ユーザーの個人情報の削除の必要がある場合
データ保護の法律が不正確な情報の削除を求めている場合
誤って流出したセンシティブな情報を封じ込める場合
流式处理的使用案例
-
- モニタリング、つまり特定の事象が生じた時にアラートを発する
-
- 複合イベント処理 (complex event processing)
= ストリーム中の特定のパターンのイベントを検索するルールをクエリやGUIでCEPエンジンに指定する。処理エンジンはストリームを消費し、マッチするものが見つかったら、新たなイベント(complex event)を生成する。
CEPエンジンの例は、Esper, Apamaなど
ストリームでの検索
(複合イベント処理との違いが分からん?)
クエリにインデックスを付けてマッチするかもしれないクエリの集合を狭めることができる。
ストリーム分析
大量のイベントに対するメトリクスの集計や統計を求める
Apache Storm, Spark Streaming, Kafka Streamsなどの分散ストリーム処理フレームワークがサポートしている。ホストされたサービスは、Google Cloud Dataflow, Azure Stream Analytics, Amazon Kinesis Data Analytics。
マテリアライズドビューの管理
DB、キャッシュ、検索インデックスの構築
イベントソーシングにおけるアプリケーションの状態。
SamzaやKafka Streamsは、Kafkaのログコンパクション上に構築されているのでサポートしている。
其他
?关于时间的思考(事件时间和处理时间)
不要混淆事件时间和处理时间。
-
- 多くのストリーム処理フレームワークはprocessing timeを利用している。
- 例えばリクエストレートの計算は、event timeで集計すべき。(ストリームプロセッサをデプロイし直したら、イベントのprocessは再起動後にまとめて行われるため)
确认所有在指定时间窗口内的事件是否全部完成是一件困难的事情。这是因为我们需要考虑在窗口结束时间之后会出现延迟的孤立事件。对此问题的解决方案可以是以下之一。
-
- はぐれイベントは無視する。ドロップされたはぐれイベント数をメトリクスとして追跡し、閾値を超えたらアラートを発するようにする。
-
- ストリームプロセッサがはぐれイベントを受け取ったら、そのことを関連システムに伝播する訂正(correction)を配信する。
- ストリームに「今後はt以前のタイムスタンプを持つイベントは発生しない」ことを示す特別なメッセージを流す。
窗户的种类 hù de
-
- tumbling window
= 幅固定、開始点が定期的、window同士は重複しない
hopping window
= 幅固定、開始点が定期的、window同士は重複する
sliding window
= 幅固定、開始点が連続的、ゆえにwindow同士は重複する
session window
= 幅固定でない、開始点はまばら、window同士は重複しない、windowに含まれない期間も存在する
实现流JOIN的方法。
-
- ストリーム – ストリーム結合
= window内でのイベント同士の結合
[実装方法] ストリームプロセッサ内で「window内にどのイベントが発生したか」という状態を保持しておく。JOINできたらJOIIN結果を表すイベントを生成する。JOINできなかった場合は、そのままのイベントを生成する。
ストリーム – テーブル結合
= streamのenrich。ストリーム内の各イベント(fact)に対して、それに関連する情報(dimension)をエンリッチする。
[実装方法1] ストリームプロセッサは各イベントの処理タイミングで、DBから関連レコードをルックアップする
?処理速度が低速
?DBが過負荷に陥る可能性あり
[実装方法2] DBのコピーをプロセッサにロードしておく。(メモリ上のハッシュテーブル、大きければローカルディスク内のインデックス)。DBのコピーの更新のためには、元のDBのchangelogを保持するストリームを作成し、これを消費する。
⚠️ slowly changing dimensionに対応するため、各イベントにはevent timeを持たせ、時間によって変化するDBのレコード(税率等)はバージョン違いを保持する。
テーブル – テーブル結合
= materializd veiwの更新の”タイミング”をイベントストリームで検知する。
[実装方法] viewを構成する複数の元データの更新をそれぞれストリームに流す。ストリームプロセッサはいずれかのストリームからイベントを受け取った時点でviewを再計算する。
为流处理器增加容错性。
在第9章中提及的MapReduce批处理是精确一次性执行的(准确来说是有效一次性执行)。
为了在输入为无限流的情况下实现这一点,有以下方法。
-
- マイクロバッチ処理 + チェックポイント処理
ストリームを小さなブロック(1s程度)に分割し、それぞれのブロックでバッチ処理を行う
ブロック同士の境目のチェックポイントごとに、現在の状態を永続性のあるストレージに書き出す
フレームワーク内部ではexactly-onceセマンティクスを提供するが、外部へ副作用が発生する場合はそうでない。
使用例: Spark Streaming
アトミックなコミット
ストリーム処理のフレームワーク内部で、状態の変更と外部へ向けたメッセージの作成をアトミックに作成する。
使用例: Google Cloud Dataflow, VoltDB, 追加予定: Apache Kafka
冪等性
以下の前提をおいた上で、メタデータを使用することで操作を冪等にできる
処理は決定的である
同じ値を並行に他のノードが更新しない
ある処理ノードから他のノードへfail overする際にはフェンシングを使用する
例: Kafkaでメッセージを消費する際には読み取り完了のオフセットを記録し、それを永続化しておく。
在障碍之后重新构建状态的方法示例是:
-
- 個々のメッセージの処理完了のたびに、その状態をリモートのDBに保存し、これをレプリケーションする
-
- 個々のメッセージの処理完了のたびに(?)、状態をストリームプロセッサのローカルディスクに保存し、それを定期的にレプリケーションする
- そもそも状態のレプリケーションが必要がない場合もある。状態を入力ストリームから再構築できる場合など。
第12章 数据系统的未来
请阅读