Spark入門
Sparkのコアのデータ構造は(RDD: Resilient Distributed Dataset)である。pandasのDataFrameのようにデータセットをRDDにロードして、メソッドでデータを扱う。
PythonでSparkを使うにはPySparkを使う。SparkはJavaでできているので Py4Jによって、JavaのオブジェクトであるRDDを操作可能になる。
Sparkの使い方
SparkではSparkContext
オブジェクトがクラスターやクラスター上で実行される座標との接続を管理する。より正確には、SparkContext
がCluster Managerに接続し、Cluster ManagerがExecutorをコントロールする。Executorは計算を行う。
まず、pysparkからSparkContext
をimportする。
from pyspark import SparkContext
次に、用意したデータセットをRDDオブジェクトとして読み込む。 ちなみにcsvがComma Separated Valuesなのに対して、tsvはTab Sparated Valuesである。
raw_data = sc.textFile("sample.tsv")
take()
メソッドでRDDになったデータを覗いてみる。
raw_data.take(5)
SparkはいくつものパーティションにRDDオブジェクトと分配していて、その分配されたデータを操作する作りになっている。Sparkはローカルのコンピューターで使うことが可能だ。またメモリを自動的にパーティションに分割して、あたかも多数のマシン上で計算を走らせているかのように振る舞う。
またSparkのRDDは必要になるまで処理を行わず先延ばしにすることができる。例えば、上記のコードでは、初めにsc.textFile("samplet.tsv")
でファイルを読みこんだが、実際には次のraw_data.take(5)
が呼び出されるまで、ファイルの読み込みを待っていた。それは、実際にはsc.textFile("samplet.tsv")
を呼んだ段階ではファイルにポインタを作成しただけで、raw_data.take(5)
が呼び出される段階で読み込まれたからだ。
これによりqueueを作成しワークフローをバックグラウンドで最適化することを実現している。通常のPythonではこれは不可能である。
SparkはHadoopのMapReduceから多くの仕様を真似ているが、多くの点で異なる。 Sparkを理解する上で重要なアイデアは、データパイプラインだ。Sparkでは全ての操作や計算がステップのseriesになっている。それぞれのステップはintegerなどのvalue、dictionaryなどのデータ構造、またRDDオブジェクトを返す。
Map()
map(f)
はRDDのすべての要素に指定した関数を適用する。そして新しいRDDオブジェクトを返す。指定した関数内では何かしらの値をreturnする必要がある。
split_data = raw_data.map(lambda line: line.split("\t"))
flatMap()
flatMap()
はすべての要素に指定した関数を適用するが、Map()
と違い、すべての要素に対してretutnで必ず何かしらの値を返す必要がない。RDDから値のシークエンスを生成したい場合に、yieldと組み合わせて使うと便利である。
filter()
filter()
はすべての要素に指定した関数を適用するが、返り値としてTrue
またはFalse
をreturnする必要がある。
take()
take(n)
はRDDの先頭からn個の要素を取得し表示する。pandasのhead()
と似たようなものである。
count()
collect()
collect()
はRDDのすべての要素をlistで返す。RDDオブジェクトをCSVとして書き出したい場合などに用いる。
TransformationsとActions
- Transoformations -
map()
,reduceByKey()
- Actions -
take()
,reduce()
,saveAsTextFile()
,collect()
Transformations
は必要になるまで実行せず常にRDDオブジェクトの参照を返す。
RDDを返す関数はすべてtransformationである。一方でvalueを返す関数はactionだ。
Immutability
RDDオブジェクトはimmutableである。すなわち書き換えることができない。Sparkは計算速度を向上させるためにimmutableなので、書き換えたい場合は、新しくオブジェクトを作る必要がある。したがってある特定のデータを削除をしたい場合や抽出したい場合は、filter()
メソッドを使って新しくオブジェクトを作る。filter()
メソッドで使う関数はTrue
orFalse
で抽出できる。