Machine Morning

機械学習やWebについて学んだことを記録しています。

Spark入門

Sparkのコアのデータ構造は(RDD: Resilient Distributed Dataset)である。pandasのDataFrameのようにデータセットRDDにロードして、メソッドでデータを扱う。

PythonでSparkを使うにはPySparkを使う。SparkはJavaでできているので Py4Jによって、JavaのオブジェクトであるRDDを操作可能になる。

Sparkの使い方

SparkではSparkContextオブジェクトがクラスターやクラスター上で実行される座標との接続を管理する。より正確には、SparkContextがCluster Managerに接続し、Cluster ManagerがExecutorをコントロールする。Executorは計算を行う。

f:id:gensasaki:20180816195953p:plain

まず、pysparkからSparkContextをimportする。

from pyspark import SparkContext

次に、用意したデータセットRDDオブジェクトとして読み込む。 ちなみにcsvがComma Separated Valuesなのに対して、tsvはTab Sparated Valuesである。

raw_data = sc.textFile("sample.tsv")

take()メソッドでRDDになったデータを覗いてみる。

raw_data.take(5)

SparkはいくつものパーティションRDDオブジェクトと分配していて、その分配されたデータを操作する作りになっている。Sparkはローカルのコンピューターで使うことが可能だ。またメモリを自動的にパーティションに分割して、あたかも多数のマシン上で計算を走らせているかのように振る舞う。

またSparkのRDDは必要になるまで処理を行わず先延ばしにすることができる。例えば、上記のコードでは、初めにsc.textFile("samplet.tsv")でファイルを読みこんだが、実際には次のraw_data.take(5)が呼び出されるまで、ファイルの読み込みを待っていた。それは、実際にはsc.textFile("samplet.tsv")を呼んだ段階ではファイルにポインタを作成しただけで、raw_data.take(5)が呼び出される段階で読み込まれたからだ。

これによりqueueを作成しワークフローをバックグラウンドで最適化することを実現している。通常のPythonではこれは不可能である。

SparkはHadoopMapReduceから多くの仕様を真似ているが、多くの点で異なる。 Sparkを理解する上で重要なアイデアは、データパイプラインだ。Sparkでは全ての操作や計算がステップのseriesになっている。それぞれのステップはintegerなどのvalue、dictionaryなどのデータ構造、またRDDオブジェクトを返す。

Map()

map(f)RDDのすべての要素に指定した関数を適用する。そして新しいRDDオブジェクトを返す。指定した関数内では何かしらの値をreturnする必要がある。

split_data = raw_data.map(lambda line: line.split("\t"))

flatMap()

flatMap()はすべての要素に指定した関数を適用するが、Map()と違い、すべての要素に対してretutnで必ず何かしらの値を返す必要がない。RDDから値のシークエンスを生成したい場合に、yieldと組み合わせて使うと便利である。

filter()

filter()はすべての要素に指定した関数を適用するが、返り値としてTrueまたはFalseをreturnする必要がある。

take()

take(n)RDDの先頭からn個の要素を取得し表示する。pandasのhead()と似たようなものである。

count()

count()RDDの要素数を取得する。

collect()

collect()RDDのすべての要素をlistで返す。RDDオブジェクトをCSVとして書き出したい場合などに用いる。

TransformationsとActions

  • Transoformations - map(), reduceByKey()
  • Actions - take(), reduce(), saveAsTextFile(), collect()

Transformationsは必要になるまで実行せず常にRDDオブジェクトの参照を返す。 RDDを返す関数はすべてtransformationである。一方でvalueを返す関数はactionだ。

Immutability

RDDオブジェクトはimmutableである。すなわち書き換えることができない。Sparkは計算速度を向上させるためにimmutableなので、書き換えたい場合は、新しくオブジェクトを作る必要がある。したがってある特定のデータを削除をしたい場合や抽出したい場合は、filter()メソッドを使って新しくオブジェクトを作る。filter()メソッドで使う関数はTrueorFalseで抽出できる。