Akka 2.2.3を、1台のサーバーで簡単に試すメモです。(
http://akka.io/)
akka 2.2.3 + Java 7(OpenJDK) + scala 2.10を、CentOS6上で。(SBTは、使わないつもりでしたが...。)
複数のサーバー(3,4台)でクラスタを簡単に試す話は、
こっちです。
目標は、こういう感じです。
0. 準備
ディレクトリ ~/work で、試します。
0-1. Java
yum -y install java-1.7.0-openjdk-devel
確認例
> javac -version
javac 1.7.0_51
0-2. Scala
CentOS6なら、
http://www.scala-lang.org/download/からrpmをダウンロードして、インストール
(
http://www.scala-lang.org/files/archive/scala-2.10.3.rpm)
rpm -ivh scala-2.10.3.rpm
確認例
> scalac -version
Scala compiler version 2.10.3 -- Copyright 2002-2013, LAMP/EPFL
0-3. Akka
"Akka 2.2.3 distribution (Scala 2.10)"を、
http://akka.io/downloads/からダウンロードする。
(
http://downloads.typesafe.com/akka/akka-2.2.3.zip)
cd ~
mkdir work
cd work
unzip どこそこ/akka-2.2.3.zip
1. 簡単な例
1-1. (A) 1つのactorに、メッセージを送る。
HelloWorld.scala
import akka.actor.Actor
import akka.actor.Props
class HelloWorld extends Actor with akka.actor.ActorLogging {
override def preStart(): Unit = {
log.debug("Hello preStart")
val greeter = context.actorOf(Props[Greeter], "greeter")
greeter ! Greeter.Greet("alice")
greeter ! Greeter.Greet("bob")
greeter ! Greeter.Greet("carol")
log.debug("Bye preStart")
}
def receive = {
case Greeter.GreetBack(answer) =>
log.info("GreetBack: " + answer)
}
}
object Greeter {
case class Greet(who: String)
case class GreetBack(answer: String)
}
class Greeter extends Actor with akka.actor.ActorLogging {
def receive = {
case Greeter.Greet(who) =>
log.info("Hello " + who)
sender ! Greeter.GreetBack("Hello back from " + who)
}
}
> scalac -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar HelloWorld.scala
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Hello preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app/greeter] Hello alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/greeter] Hello bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/greeter] Hello carol
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Bye preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from carol
^C
1-2. (B) 3つのactorに、メッセージを送る。
HelloWorld2.scala
import akka.actor.Actor
import akka.actor.Props
class HelloWorld2 extends Actor with akka.actor.ActorLogging {
override def preStart(): Unit = {
log.debug("Hello preStart")
val greeter1 = context.actorOf(Props[Greeter], "greeter1")
val greeter2 = context.actorOf(Props[Greeter], "greeter2")
val greeter3 = context.actorOf(Props[Greeter], "greeter3")
greeter1 ! Greeter.Greet("alice")
greeter2 ! Greeter.Greet("bob")
greeter3 ! Greeter.Greet("carol")
log.debug("Bye preStart")
}
def receive = {
case Greeter.GreetBack(answer) =>
log.info("GreetBack: " + answer)
}
}
object Greeter {
case class Greet(who: String)
case class GreetBack(answer: String)
}
class Greeter extends Actor with akka.actor.ActorLogging {
def receive = {
case Greeter.Greet(who) =>
log.info("Hello " + who)
sender ! Greeter.GreetBack("Hello back from " + who)
}
}
> # scalac -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar HelloWorld2.scala
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld2
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] Hello preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/greeter1] Hello alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app/greeter2] Hello bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] Bye preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/greeter3] Hello carol
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from carol
^C
1-3-1. (C) 手動生成の3つのactorに、router経由で、3つのメッセージを送る。
こんな感じかと思ったけど、うまく行かないから、省略。(sbt利用だと成功しますが。)
val greeter1 = context.actorOf(Props[Greeter], "greeter1")
val greeter2 = context.actorOf(Props[Greeter], "greeter2")
val greeter3 = context.actorOf(Props[Greeter], "greeter3")
val greeters = Vector[ActorRef](greeter1, greeter2, greeter3)
val router = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = greeters))))
router ! Greeter.Greet("alice")
1-3-2. (C) 自動生成の3つのactorに、router経由で、3つのメッセージを送る。
HelloWorld5.scala
import akka.actor.Actor
import akka.actor.ActorRef
import akka.routing.FromConfig
import akka.actor.Props
class HelloWorld5 extends Actor with akka.actor.ActorLogging {
override def preStart(): Unit = {
log.debug("Hello preStart")
val router = context.actorOf(Props[Greeter].withRouter(FromConfig), "router1")
router ! Greeter.Greet("alice")
router ! Greeter.Greet("bob")
router ! Greeter.Greet("carol")
log.debug("Bye preStart")
}
def receive = {
case Greeter.GreetBack(answer) =>
log.info("GreetBack: " + answer)
}
}
object Greeter {
case class Greet(who: String)
case class GreetBack(answer: String)
}
class Greeter extends Actor with akka.actor.ActorLogging {
def receive = {
case Greeter.Greet(who) =>
log.info("Hello " + who)
sender ! Greeter.GreetBack("Hello back from " + who)
}
}
application.conf
akka.actor.deployment {
/app/myrouter1 {
router = round-robin
nr-of-instances = 5
}
}
ここでround-robinをrandomにかえたり、いろいろ設定変更できます。
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld5
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] Hello preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/router1/$a] Hello alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/router1/$b] Hello bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/router1/$c] Hello carol
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] Bye preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from carol
^C
2. Clusterで試す(1台のサーバー上の4ノード) = (D)
akka2.2.3のclusterの話(scala)は、このあたり
・
http://doc.akka.io/docs/akka/2.2.3/common/cluster.html
・
http://doc.akka.io/docs/akka/2.2.3/scala/cluster-usage.html
です。
以下は、actorを自動生成する例です。上記ドキュメント(後者)上は、Router with Remote Deployed Routeesの名前になっています。
すでに別途作成したactorをrouterに関連づける方法はRouter Example with Lookup of Routeesです。バージョンがかわると、呼び方もあっさり変わるみたいです。
2-0. SBTを入れる
SBT無しだと、一部うまく動かなかったので。
SBTのrpmをここ(
http://www.scala-sbt.org/release/docs/Getting-Started/Setup.htmlの「RPM package」)からダウンロード
(
http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm)
> rpm -ivh sbt.rpm
準備中... ########################################### [100%]
1:sbt ########################################### [100%]
> sbt version
...(省略: 一度目だけ時間がかかります)...
[info] Set current project to root (in build file:/root/)
[info] 0.1-SNAPSHOT
2-1. 4つのノード上の自動生成のactorに、メッセージを送る。
2-1-1. SBTのしきたりにあわせてファイルを準備する。
ここでは~/work2ディレクトリで。
> mkdir ~/work2
> cd ~/work2
> mkdir -p src/main/scala
> mkdir -p src/main/resources
三つのファイルを準備。SBT上のプロジェクト設定ファイルbuild.sbtと、scalaのプログラムファイルHelloCluster.scala、akkaの設定ファイルapplication.confで、それぞれの場所に配置します。
・build.sbt
name := "Hello Cluster Project"
version := "1.0"
scalaVersion := "2.10.3"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies +=
"com.typesafe.akka" %% "akka-actor" % "2.2.3"
libraryDependencies +=
"com.typesafe.akka" %% "akka-cluster" % "2.2.3"
・src/main/scala/HelloCluster.scala
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
object HelloCluster {
def main(args: Array[String]): Unit = {
if (args.nonEmpty){
System.setProperty("akka.remote.netty.tcp.hostname", args(0))
System.setProperty("akka.remote.netty.tcp.port", args(1))
}
val system = ActorSystem("ClusterSystem")
val clusterListener = system.actorOf(Props[HelloClusterListener],
name = "clusterListener")
Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])
if(args.length > 2){
Thread.sleep(3000)
val router = system.actorOf(Props[Greeter].withRouter(
ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false, useRole = None))),
name = "clusterrouter")
router.tell(ConsistentHashableEnvelope(Greeter.Greet("alice"), "alice"), clusterListener)
router.tell(ConsistentHashableEnvelope(Greeter.Greet("bob"), "bob"), clusterListener)
router.tell(ConsistentHashableEnvelope(Greeter.Greet("carol"), "carol"), clusterListener)
}
}
}
class HelloClusterListener extends Actor with ActorLogging {
def receive = {
case state: CurrentClusterState =>
log.info("Current members: {}", state.members.mkString(", "))
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}",
member.address, previousStatus)
case Greeter.GreetBack(answer) =>
log.info("GreetBack: " + answer)
case _: ClusterDomainEvent => // ignore
}
}
object Greeter {
case class Greet(who: String)
case class GreetBack(answer: String)
}
class Greeter extends Actor with akka.actor.ActorLogging {
def receive = {
case Greeter.Greet(who) =>
log.info("Hello " + who)
sender ! Greeter.GreetBack("Hello back from " + who)
}
}
・src/main/resources/application.conf
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down = on
}
}
2-1-2. 4つのノードを実行。
一つのサーバー上で4ノード実行させます。ポート違いで、127.0.0.1:2551, 127.0.0.1:2552, 127.0.0.1:2553, 127.0.0.1:2554の四つです。(このうち2551と2552は、クラスタの中の特別なノードで、application.confファイルのseed-nodesとして設定されています。)
作業は、4つのターミナルを開いて、それぞれ、~/work2ディレクトリで、 実行します。
ターミナル1 (seedノードの1つ目。port=2551は、confファイルのseed-nodesにて設定。)
sbt "run-main HelloCluster 127.0.0.1 2551"
ターミナル2 (seedノードの2つ目。port=2552は、confファイルのseed-nodesにて設定。)
sbt "run-main HelloCluster 127.0.0.1 2552"
ターミナル3 (普通のノード。portは2551,2552以外ならなんでもOK。同様にノードを増やせます。)
sbt "run-main HelloCluster 127.0.0.1 2553"
ここまでで、3ノードクラスタが動作して、それぞれで、おおむね以下の表示で、確認できます。
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2553
ターミナル4
ここでは、ついでに、routerを作って、messageを投入。
sbt "run-main HelloCluster 127.0.0.1 2554 go"
すると、以下が、どこかのノードに表示され、
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-13] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c4] Hello alice
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c1] Hello bob
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c9] Hello carol
もとのノード(ターミナル4)に、以下が表示されます。
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from alice
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-13] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from bob
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from carol
3.Clusterで試す(4台のサーバー上の4ノード)
3-1. プログラムをjarパッケージ化して他のサーバへ配布
SBTプロジェクトにsbt-assemblyを設定
sbt-assemblyは、
https://github.com/sbt/sbt-assembly
・project/assembly.sbt として以下の内容のファイルを作成
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
・build.bstに以下を追加(import行は、ファイルの先頭に)
import AssemblyKeys._
assemblySettings
jarパッケージを作成
ここで、4サーバーは、10.0.0.1:2551, 10.0.0.2:2551, 10.0.0.3:2551, 10.0.0.4:2551として、
seed-nodeを10.0.0.1:2551と10.0.0.2:2551とします。
・src/main/resources/application.confの以下の部分を変更します。
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@10.0.0.1:2551",
"akka.tcp://ClusterSystem@10.0.0.2:2551"]
auto-down = on
}
パッケージを作成します。
sbt package
できあがりは、
target/scala-2.10/hello-cluster-project_2.10-1.0.jar
ファイルです。
これを、各サーバにコピーします。各サーバーでは、javaのみ必要です。
3-2. 各サーバ上で、ノードを実行
一つ目(seedノード)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.1 2551
二つ目(seedノード)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.2 2551
三つ目(普通のノード。同様に、いくつでも増やせます。)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.3 2551
四つ目で、ついでに、routerを作って、messageを投入。
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.4 2551 go
結果は、2と同じです。
--
以上