Apache Sparkのサンプルコードを読む

th_raspi-double

前回、Raspberry Pi 2でApache Stormの実行環境を構築し、サンプルプログラムを実行するところまでやりました。

ここから先は、Apache Sparkの公式ドキュメントも参考にしながら、サンプルソースを読み解いていくことにします。

サンプルプログラムの実行に使ったrun-exampleは、サンプルファイルを実行してくれるスクリプトです。引数なしで”$ bin/run-example”を実行すると、引数についてのヘルプが出てきます。

$ bin/run-example
Usage: ./bin/run-example <example-class> [example-args]
- set MASTER=XX to use a specific master
- can use abbreviated example class name relative to com.apache.spark.examples
(e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)

<example-class>で指定できるファイル名は、”examples/src/main/scala/org/apache/spark/examples”を見ればわかります。

[example-args]は、指定したサンプルファイルごとに異なる引数です。”SparkPi”を指定した場合は、上記に格納されている”SparkPi.scala”を見てみると、スライス数の指定になっているようです(指定しない場合は2になる)。ちなみに、ここでの”Pi”は円周率のπのことであり、Raspberry PiのPiとは特に関係ありません。

で、「スライス数」とは何ぞやと。Spark Programming Guideを見てみると、スライス数について以下のような記述があります。

Parallelized Collections

One important parameter for parallel collections is the number of slices to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)).

スライス数はParallel Collectionsにとって重要なパラメータである。Sparkは一つのタスクを、クラスタの各スライスに対して実行する。通常、クラスタ内の各CPUに対して2〜4スライスいるだろう。基本はSparkがクラスタに基づいて自動的にスライス数をセットしようとするけど、パラメータとして自分でセットすることもできるよ。

ふーん。Raspberry Pi 2のCPUは4コアだから、とりあえず4でセットすればいいのかな?でもサンプルみたいにコア数をオーバーするような値をセットした場合はどうなるんだろう。これはソースコードを見ないとわからないかな。。。

でもそもそもParallel Collectionsとは何ぞやと。再度Spark Programming Guideを見てみます。

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.

SparkはRDD(回復力のある分散データセット?)のコンセプトを中心に展開するものであり、RDDは並列に処理可能な、耐障害性のある(要素の)コレクションである。RDDには現在のところ2種類ある。既存のScalaコレクションを取り込み、その上で関数を並列に実行する”Parallelized collections”と、HadoopのHDFSやその他Hadoopのサポートするストレージ・システムに存在するファイルの各レコード上で関数を実行する”Hadoop datasets”である。どちらのRDDも、同じメソッドで操作することができる。

ふーむ。Scala言語なんてのはこのSparkを見るようになって初めて知ったぐらいなんですが、どうやら言語そのものに並列コレクションという、並列プログラミング向けの機能があるようなので、それを利用している、ということかな。

まだ色々ピンとこないので、実際にサンプル”SparkPi.scala”を見ながら、Sparkのプログラムの書き方を見ていきたいと思います。

package org.apache.spark.examples

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }

これだけ。とてもシンプルに見えます。私は数式には全然疎いので知りませんでしたが、これはモンテカルロ法というのを使って円周率を求めているようです。

では、いくつかピックアップして見ていきたいと思います。ちなみに私はこれまでScalaという言語を触ったことはありません。

import org.apache.spark._

Scalaでは、パッケージ内すべてのインポート指定に、アスタリスク( * )ではなくアンダーバー( _ )を使うようです。

Scalaではシングルトンオブジェクトの定義に、classではなくobjectを使います。objectの中にmainメソッドを書けば、そこが実行の起点となります。

val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)

Sparkプログラムでは、最初にSparkContextオブジェクトを作成する必要があります。このオブジェクトが、Sparkにクラスタへのアクセス方法を教えてくれることになるので、まずSparkContextに必要な情報を設定してやる必要があります。 APIドキュメントを見てみると、SparkContextのコンストラクタは色々な引数をとれるようですが、ここではSparkConfオブジェクトを使って設定する方法がとられています。ここではSparkConfにアプリ名しかセットしていませんが、”new SparkConf().setMaster("local").setAppName("My app")“のように、メソッドチェーンで色々設定できるようになっているようです。

SparkContextには最低限、master(=Sparkアプリの動作環境の指定)とappName(=後述するWeb UIで表示されるアプリ名)が必要なのだそうですが、ここではsetAppNameしか呼び出していません。masterを指定しているのはどこか?

おそらくですが、サンプルプログラムの実行に使ったシェルスクリプト”bin/run-example”に記載されています。”bin/run-example”は内部でさらに”bin/spark-submit”を呼び出していて、その引数に”–master $EXAMPLE_MASTER”を指定されています。$EXAMPLE_MASTERには”local[*]”が代入されているので、これで「ローカルモードで、スレッド数をCPUのコア数に合わせる」という指定をしていることになると思われます。

val slices = if (args.length &gt; 0) args(0).toInt else 2

ここが、先に述べたスライス数の指定になっているところです。

    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1 
      val y = random * 2 - 1 
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)

ここがこのサンプルソースの肝ですね。Scalaの文法とモンテカルロ法、それからSparkのことをある程度わかっていないと、理解できません。

まず先に、モンテカルロ法の基本的な考え方を理解する必要があります。自分がパッとみたところでは、以下がわかりやすかったです。

上を見たあとでもう一度サンプルソースを見てみると、

val n = math.min(100000L * slices, Int.MaxValue).toInt

この部分で、(Intの上限を超えない範囲で)できる限りたくさんの点を評価できるように、点の個数nを定めていることがわかります。ちなみに、100000LのLは、Long型の整数であることを示します。

続いて、”spark.parallelize(1 until n, slices)”に着目します。parallelize[T](seq: Seq[T], numSlices: Int)の第一引数はシーケンス(順序を表す箱)。つまり、”1 until n”で生成される、1からn-1までの値の(イミュータブルな=不変な)コレクションを、slices個にスライスして、RDD(回復力のある分散データセット)を生成します。「回復力のある分散データセット」というのは何とも滑稽で気持ち悪い日本語ですが、自分の中でイメージが定着するまでは、しばらくこの言い方を使います。

このparallelize()によって、RDDオブジェクトが返ってくるので、続けてRDDオブジェクトのmapメソッドを実行します。mapメソッドは、RDD内のすべての要素に対して関数を適用して、新たなRDDを生成します。mapの中での関数の定義方法が、かなりScala独特の書き方なので、最初全然意味がわかりませんでした。。。とりあえず以下のルールを知って、なんとなくはわかるようになりました。

  • 引数が1つのメソッドは()の代わりに{}を使うことができる。主に関数を引数として渡すときに{}を使う
  • Scalaでは文末尾のセミコロンは省略できる。複数行入力した場合は、Scalaの方でうまいこと文を分割してくれる(セミコロン推論)
  • Scalaの関数リテラルは、”関数パラメータ => 関数本体”という形になる。関数パラメータの型指定は、型推論が可能であれば省略できる。
  • ブロックの最後の式の結果が、そのブロックの結果となる
  • if文は値を返す
  • プレイスホルダ構文では、引数名を”_”にして使用する

特にメソッド呼び出しの方法が謎過ぎた。。。このあたりを参照させていただいて、ようやく「こういうのもあるのか」という感じになりました(理解しているわけではないです)。

ここでのmapメソッドは、RDDのコレクションの各要素(iで参照)の値そのものは使っておらず、コレクションの要素の数だけランダムに-1〜1内の点を生成して、それが原点0半径1の円の中に収まっている(1)かいない(0)かを要素とした新たなRDDのコレクションを生成しているようです。

そして、その新たなRDDのコレクション(=1,0,0,1,1,…,のような並びがn個あるコレクション)に対して、reduceメソッドがすべての要素の和をとって、countの値として返している、と。reduceメソッドはコレクションの先頭2つの要素に対して関数(ここでは “+”)を適用し、その結果値と、コレクションのその次の要素に対して同じ関数を適用する、というのを、リストの要素が全部処理されるまで繰り返してくれるものだそうです。つまり、ここでは打った点の内の何個が半径1の円の中に収まったか、というのをcountしていることになります。

最後に、モンテカルロ法に従い、円内の点数/全点数を4倍して、πを計算しています。そして、sparkContextを終了。

。。。こんなに短いサンプルを読むのに、エラい時間がかかってしまいました。とにかくScalaについて全くの無知だったことが原因です。できればScalaについて1日ぐらい勉強してからSparkに取り組んだ方がよいかもしれません。自分は以下の本を買って勉強しました。

 

さて、サンプルソースを読むのにずいぶん時間を使ってしまいましたが、元々は以下の記事を参照させてもらって勉強を進めていました。

サンプルコードを実行した後も、ローカルモードの実行についてはもう少し続きがあるので、それを片付けてしまいます。

Sparkシェルの動作を試してみます。

$ bin/spark-shell --master local[4]

これで、Sparkシェルが4コアのローカルモードで動作するようになります。この状態で、以下を実行すると、ファイル”README.md”の行数である”98″が返ってきます。

scala> sc.textFile("README.md").count

Sparkシェルを使うときは、自動的にSparkContextオブジェクトが作成されており、変数scで呼び出すことができます。SparkContextオブジェクトのメソッドであるtextFileは、HDFSやローカルファイルからテキストファイルを読み込み、文字列のRDDを返します。countはRDDオブジェクトのメソッドで、RDD内の要素数を返します。

Sparkシェルが動作しているとき、同時にWebサーバが起動するようになっており、例えばブラウザから”http://192.168.24.xxx:4040/”などでSpark UIにアクセスできるようになっています。実行したジョブがどのように処理されたかを確認するのに便利です。

SparkシェルはCtrl + DかCtrl + Cで終了できます。続いて、Python版のシェルであるpysparkを起動します。

$ bin/pyspark --master local[4]

同じように

>>> sc.textFile("README.md").count()

で行数(98)が得られるかを確認できればOK。quit()でシェルを終了できます。

続いて、Python APIを使ったサンプルコードの実行です。

$ ./bin/spark-submit examples/src/main/python/pi.py 10

上記のサンプルソースについては、SparkPi.scalaを理解できれば特に難しいことはないと思うので、省略します。個人的には、Scala版より書きやすそうです。APIについても、こちらにちゃんとドキュメントがあるので、適宜参照すれば問題ないでしょう。

 

これでようやくApache Sparkのローカルモードでのテストは終了です。次はいよいよ複数台のRaspberry Piで分散処理をやってみたいと思います。