akka (http://akka.io/) は、分散システムを作るためのツールキット(ライブラリと小さいプログラム)です。分散は、ロバストとかスケールとかのために。かけ声は、Let it crash (http://letitcrash.com/)。(1台のサーバでちょっとためす例は、こっちを。)
akka 2.2.3 + Java 7(OpenJDK) + SBT(scala 2.10)を使います。
OSは、ここでは、CentOS6かUbuntu12.04ですが、Javaさえ動けば、わりとなんでもよいはずです。
3台(4台)のクラスタを動かします。サンプルのプログラムは、モンテカルロ法での円周率の計算です。重めになるかなと思ってBigIntegerにしてみましたが、πの精度は目指していません。計算をスタートさせるノード(10.0.0.4)から、クラスタに適当にたくさんの計算ジョブ(ノード数より多いジョブ)をばらまきます。ばらまきはrouterに任せます。各ノードは、計算が出来上がったら、最初のノードに結果を返します。最初のノードは、返された結果を集計して円周率を表示します。返される結果が増えると、次第に円周率っぽくなっていく、という感じ。(絵は4台ですが、10.0.0.3は省略してもかまいません。)
0. 準備
0-1. Linuxサーバーを3つ以上用意します。
ほぼ最小インストールのLinuxを準備します。必要なのは、ネットワークと作業用のssh serverくらいです。(ファイアウォールはtcpのポート2551を許可、SELinuxはOFF)
クラスタの構成は、3台または4台で、サーバーIP:利用ポートは、以下です。
・10.0.0.1:2551を、seedノードとして。
・10.0.0.2:2551を、seedノードとして。
・10.0.0.3:2551を、普通のノードとして。
・10.0.0.4:2551を、計算スタートをさせるノードとして。4台が面倒なら、10.0.0.3と10.0.0.4を兼用で、合計3台でも。
あとは、普通のノードは、お好みで、10.0.0.5, ...と、いくつでも。
seedは、AkkaのClusterの中の特別なノードですが、プログラムからみるとseedかどうかはあまり関係ないはずです。とりあえずは、きっかけのために2個ぐらいあればいい、程度かと。
0-2. すべてのサーバーにJava 7 (OpenJDK) を入れます。
CentOS6では、yum -y install java-1.7.0-openjdk-devel
Ubuntu12.04では、
apt-get -y install openjdk-7-jdk
ここでは全サーバにJDKを入れていますが、開発機以外はJREだけでいいのかも。
0-3. プログラム開発用サーバー1台だけに、SBT(scala)を入れます。
SBTのパッッケージをダウンロードします。パッケージは、ここ(http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html)の、 RPM package(CentOS)やDEB package(Ubuntu)のリンクから。
(RPM: http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm)
(DEB: http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.deb)
CentOS6では、sbt.rpmをインストール
rpm -ivh sbt.rpmUbuntu12.04では、sbt.debをインストール
dpkg -i sbt.deb
そして、CentOSでもUbuntuでも、確認と初期設定をかねて、
sbt versionとし、実行結果は例えば、
...(省略: 一度目だけ時間がかかります)... [info] 0.1-SNAPSHOT
1. 開発機上で、プログラムを作成して、配布用にjarパッケージ化
1-1. プログラムやSBTプロジェクト設定ファイルを作成
SBTのしきたりにあわせて、プロジェクトのディレクトリを作り、4つのファイルを作成します。ここでは、~/work内で作業します。
ディレクトリ作成
mkdir ~/work cd ~/work mkdir project mkdir -p src/main/scala mkdir -p src/main/resources
ファイル作成(4つ)
・build.sbt (SBTプロジェクトの設定。空白もこのままじゃないと駄目だそうです。)
import AssemblyKeys._ assemblySettings name := "HeavyPiClusterProject" version := "1.0" scalaVersion := "2.10.3" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.3" libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.2.3"
・project/assembly.sbt (SBTのjarパッケージプラグインの設定)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
・src/main/resources/application.conf (akkaの設定、主にCluster関連)
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://HeavyPiCluster@10.0.0.1:2551", "akka.tcp://HeavyPiCluster@10.0.0.2:2551"] auto-down = on } }
・src/main/scala/HeavyPiCluster.scala (akkaのプログラム)
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import java.math.BigInteger
import java.util.Random
object HeavyPiCluster {
def main(args: Array[String]): Unit = {
if (args.nonEmpty){
System.setProperty("akka.remote.netty.tcp.hostname", args(0))
System.setProperty("akka.remote.netty.tcp.port", args(1))
}
val system = ActorSystem("HeavyPiCluster")
val clusterListener = system.actorOf(Props[HeavyPiListener],
name = "clusterListener")
Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])
if(args.length > 2){
Thread.sleep(3000)
val router = system.actorOf(Props[HeavyPi].withRouter(
ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false, useRole = None))),
name = "clusterrouter")
(1 to 100).foreach{ i =>
router.tell(ConsistentHashableEnvelope(HeavyPi.Request(1000), i), clusterListener)
}
}
}
}
class HeavyPiListener extends Actor with ActorLogging {
var totalreq = 0
var totaliter = 0
var totalp = 0
def receive = {
case state: CurrentClusterState =>
log.info("Current members: {}", state.members.mkString(", "))
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}",
member.address, previousStatus)
case HeavyPi.Response(i, p) =>
totalreq += 1
totaliter += i
totalp += p
val pi = 4D * totalp / totaliter
log.info("Actor Response {}, pi={}", totaliter, pi)
case _: ClusterDomainEvent => // ignore
}
}
object HeavyPi {
case class Request(iter: Integer)
case class Response(iter: Integer, p: Integer)
}
class HeavyPi extends Actor with akka.actor.ActorLogging {
val b = 256
val m = new BigInteger("2").pow(b).subtract(new BigInteger("1"))
val m2 = m.pow(2)
def receive = {
case HeavyPi.Request(iter) =>
var p = 0
(1 to iter).foreach{ i =>
val rnd = new Random()
val x = new BigInteger(b, rnd)
val y = new BigInteger(b, rnd)
val r2 = x.multiply(x).add(y.multiply(y))
if(r2.compareTo(m2) <= 0) p += 1
}
sender ! HeavyPi.Response(iter, p)
}
}
1-2. プログラムをコンパイルして、jarパッケージを作成し、各サーバーに配布
コンパイルしてjarファイルを作成。sbt assembly結果例
...(長いので省略)... [info] SHA-1: f53f2a9fc0c122bfcea1620de9a2fdfb892403ca [info] Packaging /home/test/work/target/scala-2.10/HeavyPiClusterProject-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 30 s, completed 2014/01/01 20:12:34
できあがりは、HeavyPiClusterProject-assembly-1.0.jarです。
そして、jarパッケージファイルを各サーバーにコピーします。一応、コマンド例は、
scp target/scala-2.10/HeavyPiClusterProject-assembly-1.0.jar 10.0.0.2:/tmp/
2. 実行
一台目(seedノード)java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.1 2551
二台目(seedノード)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.2 2551
三台目(普通のノード。同様に、いくつでも増やせます。0台でも。...4台目サーバーが無い人は、ここを省略。)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.3 2551
このあたりで、各コンソールの表示を見ると、以下の感じの行が見えるはずです。
[INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-4] [akka://HeavyPiCluster/user/clusterListener] Member is Up: akka.tcp://HeavyPiCluster@10.0.0.1:2551 [INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-14] [akka://HeavyPiCluster/user/clusterListener] Member is Up: akka.tcp://HeavyPiCluster@10.0.0.2:2551 [INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-12] [akka://HeavyPiCluster/user/clusterListener] Member is Up: akka.tcp://HeavyPiCluster@10.0.0.3:2551この場合、この時点では、3台構成のAkkaのクラスターが動作していることになります。
四台目のノードで、π計算を開始。(routerを作って、各ノードに計算要求のmessageを投入。)
java -cp HeavyPiClusterProject-assembly-1.0.jar HeavyPiCluster 10.0.0.4 2551 pi
ここで、少しすると、各ノードから結果がかえってきて、以下のようになります。
...(省略)... [INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-2] [akka://HeavyPiCluster/user/clusterListener] Actor Response 98000, pi=3.1386938775510203 [INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-2] [akka://HeavyPiCluster/user/clusterListener] Actor Response 99000, pi=3.1387878787878787 [INFO] ...(省略)... [HeavyPiCluster-akka.actor.default-dispatcher-16] [akka://HeavyPiCluster/user/clusterListener] Actor Response 100000, pi=3.13968--
以上
0 件のコメント:
コメントを投稿