akka 2.2.3 + Java 7(OpenJDK) + scala 2.10を、CentOS6上で。(SBTは、使わないつもりでしたが...。)
複数のサーバー(3,4台)でクラスタを簡単に試す話は、こっちです。
目標は、こういう感じです。
0. 準備
ディレクトリ ~/work で、試します。0-1. Java
yum -y install java-1.7.0-openjdk-devel
確認例
> javac -version javac 1.7.0_51
0-2. Scala
CentOS6なら、http://www.scala-lang.org/download/からrpmをダウンロードして、インストール(http://www.scala-lang.org/files/archive/scala-2.10.3.rpm)
rpm -ivh scala-2.10.3.rpm
確認例
> scalac -version Scala compiler version 2.10.3 -- Copyright 2002-2013, LAMP/EPFL
0-3. Akka
"Akka 2.2.3 distribution (Scala 2.10)"を、http://akka.io/downloads/からダウンロードする。(http://downloads.typesafe.com/akka/akka-2.2.3.zip)
cd ~ mkdir work cd work unzip どこそこ/akka-2.2.3.zip
1. 簡単な例
1-1. (A) 1つのactorに、メッセージを送る。
HelloWorld.scalaimport akka.actor.Actor
import akka.actor.Props
class HelloWorld extends Actor with akka.actor.ActorLogging {
override def preStart(): Unit = {
log.debug("Hello preStart")
val greeter = context.actorOf(Props[Greeter], "greeter")
greeter ! Greeter.Greet("alice")
greeter ! Greeter.Greet("bob")
greeter ! Greeter.Greet("carol")
log.debug("Bye preStart")
}
def receive = {
case Greeter.GreetBack(answer) =>
log.info("GreetBack: " + answer)
}
}
object Greeter {
case class Greet(who: String)
case class GreetBack(answer: String)
}
class Greeter extends Actor with akka.actor.ActorLogging {
def receive = {
case Greeter.Greet(who) =>
log.info("Hello " + who)
sender ! Greeter.GreetBack("Hello back from " + who)
}
}
> scalac -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar HelloWorld.scala > java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld [INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Hello preStart [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app/greeter] Hello alice [INFO] ...略... [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/greeter] Hello bob [INFO] ...略... [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/greeter] Hello carol [INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Bye preStart [INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from alice [INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from bob [INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from carol ^C
1-2. (B) 3つのactorに、メッセージを送る。
HelloWorld2.scalaimport akka.actor.Actor
import akka.actor.Props
class HelloWorld2 extends Actor with akka.actor.ActorLogging {
override def preStart(): Unit = {
log.debug("Hello preStart")
val greeter1 = context.actorOf(Props[Greeter], "greeter1")
val greeter2 = context.actorOf(Props[Greeter], "greeter2")
val greeter3 = context.actorOf(Props[Greeter], "greeter3")
greeter1 ! Greeter.Greet("alice")
greeter2 ! Greeter.Greet("bob")
greeter3 ! Greeter.Greet("carol")
log.debug("Bye preStart")
}
def receive = {
case Greeter.GreetBack(answer) =>
log.info("GreetBack: " + answer)
}
}
object Greeter {
case class Greet(who: String)
case class GreetBack(answer: String)
}
class Greeter extends Actor with akka.actor.ActorLogging {
def receive = {
case Greeter.Greet(who) =>
log.info("Hello " + who)
sender ! Greeter.GreetBack("Hello back from " + who)
}
}
> # scalac -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar HelloWorld2.scala > java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld2 [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] Hello preStart [INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/greeter1] Hello alice [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app/greeter2] Hello bob [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] Bye preStart [INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/greeter3] Hello carol [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from alice [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from bob [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from carol ^C
1-3-1. (C) 手動生成の3つのactorに、router経由で、3つのメッセージを送る。
こんな感じかと思ったけど、うまく行かないから、省略。(sbt利用だと成功しますが。)val greeter1 = context.actorOf(Props[Greeter], "greeter1") val greeter2 = context.actorOf(Props[Greeter], "greeter2") val greeter3 = context.actorOf(Props[Greeter], "greeter3") val greeters = Vector[ActorRef](greeter1, greeter2, greeter3) val router = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = greeters)))) router ! Greeter.Greet("alice")
1-3-2. (C) 自動生成の3つのactorに、router経由で、3つのメッセージを送る。
HelloWorld5.scalaimport akka.actor.Actor import akka.actor.ActorRef import akka.routing.FromConfig import akka.actor.Props class HelloWorld5 extends Actor with akka.actor.ActorLogging { override def preStart(): Unit = { log.debug("Hello preStart") val router = context.actorOf(Props[Greeter].withRouter(FromConfig), "router1") router ! Greeter.Greet("alice") router ! Greeter.Greet("bob") router ! Greeter.Greet("carol") log.debug("Bye preStart") } def receive = { case Greeter.GreetBack(answer) => log.info("GreetBack: " + answer) } } object Greeter { case class Greet(who: String) case class GreetBack(answer: String) } class Greeter extends Actor with akka.actor.ActorLogging { def receive = { case Greeter.Greet(who) => log.info("Hello " + who) sender ! Greeter.GreetBack("Hello back from " + who) } }application.conf
akka.actor.deployment { /app/myrouter1 { router = round-robin nr-of-instances = 5 } }ここでround-robinをrandomにかえたり、いろいろ設定変更できます。
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld5 [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] Hello preStart [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/router1/$a] Hello alice [INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/router1/$b] Hello bob [INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/router1/$c] Hello carol [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] Bye preStart [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from alice [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from bob [INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from carol ^C
2. Clusterで試す(1台のサーバー上の4ノード) = (D)
akka2.2.3のclusterの話(scala)は、このあたり・http://doc.akka.io/docs/akka/2.2.3/common/cluster.html
・http://doc.akka.io/docs/akka/2.2.3/scala/cluster-usage.html です。
以下は、actorを自動生成する例です。上記ドキュメント(後者)上は、Router with Remote Deployed Routeesの名前になっています。 すでに別途作成したactorをrouterに関連づける方法はRouter Example with Lookup of Routeesです。バージョンがかわると、呼び方もあっさり変わるみたいです。
2-0. SBTを入れる
SBT無しだと、一部うまく動かなかったので。SBTのrpmをここ(http://www.scala-sbt.org/release/docs/Getting-Started/Setup.htmlの「RPM package」)からダウンロード
(http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm)
> rpm -ivh sbt.rpm 準備中... ########################################### [100%] 1:sbt ########################################### [100%] > sbt version ...(省略: 一度目だけ時間がかかります)... [info] Set current project to root (in build file:/root/) [info] 0.1-SNAPSHOT
2-1. 4つのノード上の自動生成のactorに、メッセージを送る。
2-1-1. SBTのしきたりにあわせてファイルを準備する。
ここでは~/work2ディレクトリで。> mkdir ~/work2 > cd ~/work2 > mkdir -p src/main/scala > mkdir -p src/main/resources
三つのファイルを準備。SBT上のプロジェクト設定ファイルbuild.sbtと、scalaのプログラムファイルHelloCluster.scala、akkaの設定ファイルapplication.confで、それぞれの場所に配置します。
・build.sbt
name := "Hello Cluster Project" version := "1.0" scalaVersion := "2.10.3" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.3" libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.2.3"
・src/main/scala/HelloCluster.scala
import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterSettings import akka.routing.ConsistentHashingRouter import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope object HelloCluster { def main(args: Array[String]): Unit = { if (args.nonEmpty){ System.setProperty("akka.remote.netty.tcp.hostname", args(0)) System.setProperty("akka.remote.netty.tcp.port", args(1)) } val system = ActorSystem("ClusterSystem") val clusterListener = system.actorOf(Props[HelloClusterListener], name = "clusterListener") Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent]) if(args.length > 2){ Thread.sleep(3000) val router = system.actorOf(Props[Greeter].withRouter( ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None))), name = "clusterrouter") router.tell(ConsistentHashableEnvelope(Greeter.Greet("alice"), "alice"), clusterListener) router.tell(ConsistentHashableEnvelope(Greeter.Greet("bob"), "bob"), clusterListener) router.tell(ConsistentHashableEnvelope(Greeter.Greet("carol"), "carol"), clusterListener) } } } class HelloClusterListener extends Actor with ActorLogging { def receive = { case state: CurrentClusterState => log.info("Current members: {}", state.members.mkString(", ")) case MemberUp(member) => log.info("Member is Up: {}", member.address) case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) case MemberRemoved(member, previousStatus) => log.info("Member is Removed: {} after {}", member.address, previousStatus) case Greeter.GreetBack(answer) => log.info("GreetBack: " + answer) case _: ClusterDomainEvent => // ignore } } object Greeter { case class Greet(who: String) case class GreetBack(answer: String) } class Greeter extends Actor with akka.actor.ActorLogging { def receive = { case Greeter.Greet(who) => log.info("Hello " + who) sender ! Greeter.GreetBack("Hello back from " + who) } }
・src/main/resources/application.conf
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] auto-down = on } }
2-1-2. 4つのノードを実行。
一つのサーバー上で4ノード実行させます。ポート違いで、127.0.0.1:2551, 127.0.0.1:2552, 127.0.0.1:2553, 127.0.0.1:2554の四つです。(このうち2551と2552は、クラスタの中の特別なノードで、application.confファイルのseed-nodesとして設定されています。)作業は、4つのターミナルを開いて、それぞれ、~/work2ディレクトリで、 実行します。
ターミナル1 (seedノードの1つ目。port=2551は、confファイルのseed-nodesにて設定。)
sbt "run-main HelloCluster 127.0.0.1 2551"
ターミナル2 (seedノードの2つ目。port=2552は、confファイルのseed-nodesにて設定。)
sbt "run-main HelloCluster 127.0.0.1 2552"
ターミナル3 (普通のノード。portは2551,2552以外ならなんでもOK。同様にノードを増やせます。)
sbt "run-main HelloCluster 127.0.0.1 2553"
ここまでで、3ノードクラスタが動作して、それぞれで、おおむね以下の表示で、確認できます。
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551 [INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552 [INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2553
ターミナル4
ここでは、ついでに、routerを作って、messageを投入。
sbt "run-main HelloCluster 127.0.0.1 2554 go"すると、以下が、どこかのノードに表示され、
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-13] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c4] Hello alice
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c1] Hello bob
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c9] Hello carol
もとのノード(ターミナル4)に、以下が表示されます。
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from alice [INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-13] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from bob [INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from carol
3.Clusterで試す(4台のサーバー上の4ノード)
3-1. プログラムをjarパッケージ化して他のサーバへ配布
SBTプロジェクトにsbt-assemblyを設定
sbt-assemblyは、https://github.com/sbt/sbt-assembly
・project/assembly.sbt として以下の内容のファイルを作成
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
・build.bstに以下を追加(import行は、ファイルの先頭に)
import AssemblyKeys._ assemblySettings
jarパッケージを作成
ここで、4サーバーは、10.0.0.1:2551, 10.0.0.2:2551, 10.0.0.3:2551, 10.0.0.4:2551として、 seed-nodeを10.0.0.1:2551と10.0.0.2:2551とします。
・src/main/resources/application.confの以下の部分を変更します。
cluster { seed-nodes = [ "akka.tcp://ClusterSystem@10.0.0.1:2551", "akka.tcp://ClusterSystem@10.0.0.2:2551"] auto-down = on }
パッケージを作成します。
sbt package
できあがりは、 target/scala-2.10/hello-cluster-project_2.10-1.0.jar ファイルです。
これを、各サーバにコピーします。各サーバーでは、javaのみ必要です。
3-2. 各サーバ上で、ノードを実行
一つ目(seedノード)java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.1 2551
二つ目(seedノード)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.2 2551
三つ目(普通のノード。同様に、いくつでも増やせます。)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.3 2551
四つ目で、ついでに、routerを作って、messageを投入。
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.4 2551 go
結果は、2と同じです。
--
以上
0 件のコメント:
コメントを投稿