2014年1月20日月曜日

Akka 2.2.3をちょっと試す

Akka 2.2.3を、1台のサーバーで簡単に試すメモです。(http://akka.io/)
akka 2.2.3 + Java 7(OpenJDK) + scala 2.10を、CentOS6上で。(SBTは、使わないつもりでしたが...。)
複数のサーバー(3,4台)でクラスタを簡単に試す話は、こっちです。

目標は、こういう感じです。

0. 準備

ディレクトリ ~/work で、試します。

0-1. Java

yum -y install java-1.7.0-openjdk-devel

確認例
> javac -version
javac 1.7.0_51

0-2. Scala

CentOS6なら、http://www.scala-lang.org/download/からrpmをダウンロードして、インストール
(http://www.scala-lang.org/files/archive/scala-2.10.3.rpm)
rpm -ivh scala-2.10.3.rpm

確認例
> scalac -version
Scala compiler version 2.10.3 -- Copyright 2002-2013, LAMP/EPFL

0-3. Akka

"Akka 2.2.3 distribution (Scala 2.10)"を、http://akka.io/downloads/からダウンロードする。
(http://downloads.typesafe.com/akka/akka-2.2.3.zip)

cd ~
mkdir work
cd work
unzip どこそこ/akka-2.2.3.zip

1. 簡単な例

1-1. (A) 1つのactorに、メッセージを送る。

HelloWorld.scala
import akka.actor.Actor
import akka.actor.Props
     
class HelloWorld extends Actor with akka.actor.ActorLogging {
     
    override def preStart(): Unit = {
        log.debug("Hello preStart")

        val greeter = context.actorOf(Props[Greeter], "greeter")

        greeter ! Greeter.Greet("alice")
        greeter ! Greeter.Greet("bob")
        greeter ! Greeter.Greet("carol")

        log.debug("Bye preStart")
    }
     
    def receive = {
        case Greeter.GreetBack(answer) =>
            log.info("GreetBack: " + answer)
    }
}

object Greeter {
    case class Greet(who: String)
    case class GreetBack(answer: String)
}
     
class Greeter extends Actor with akka.actor.ActorLogging {
    def receive = {
    case Greeter.Greet(who) =>
        log.info("Hello " + who)
        sender ! Greeter.GreetBack("Hello back from " + who)
    }
}

> scalac -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar HelloWorld.scala 
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Hello preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app/greeter] Hello alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/greeter] Hello bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/greeter] Hello carol
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] Bye preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-2] [akka://Main/user/app] GreetBack: Hello back from carol
^C

1-2. (B) 3つのactorに、メッセージを送る。

HelloWorld2.scala
import akka.actor.Actor
import akka.actor.Props
     
class HelloWorld2 extends Actor with akka.actor.ActorLogging {
     
    override def preStart(): Unit = {
        log.debug("Hello preStart")

        val greeter1 = context.actorOf(Props[Greeter], "greeter1")
        val greeter2 = context.actorOf(Props[Greeter], "greeter2")
        val greeter3 = context.actorOf(Props[Greeter], "greeter3")

        greeter1 ! Greeter.Greet("alice")
        greeter2 ! Greeter.Greet("bob")
        greeter3 ! Greeter.Greet("carol")

        log.debug("Bye preStart")
    }
     
    def receive = {
        case Greeter.GreetBack(answer) =>
            log.info("GreetBack: " + answer)
    }
}

object Greeter {
    case class Greet(who: String)
    case class GreetBack(answer: String)
}
     
class Greeter extends Actor with akka.actor.ActorLogging {
    def receive = {
    case Greeter.Greet(who) =>
        log.info("Hello " + who)
        sender ! Greeter.GreetBack("Hello back from " + who)
    }
}

> # scalac -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar HelloWorld2.scala
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld2
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] Hello preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/greeter1] Hello alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app/greeter2] Hello bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] Bye preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/greeter3] Hello carol
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app] GreetBack: Hello back from carol
^C

1-3-1. (C) 手動生成の3つのactorに、router経由で、3つのメッセージを送る。

こんな感じかと思ったけど、うまく行かないから、省略。(sbt利用だと成功しますが。)
        val greeter1 = context.actorOf(Props[Greeter], "greeter1")
        val greeter2 = context.actorOf(Props[Greeter], "greeter2")
        val greeter3 = context.actorOf(Props[Greeter], "greeter3")

        val greeters = Vector[ActorRef](greeter1, greeter2, greeter3)
        val router = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = greeters))))

        router ! Greeter.Greet("alice")

1-3-2. (C) 自動生成の3つのactorに、router経由で、3つのメッセージを送る。

HelloWorld5.scala
import akka.actor.Actor
import akka.actor.ActorRef
import akka.routing.FromConfig
import akka.actor.Props
     
class HelloWorld5 extends Actor with akka.actor.ActorLogging {
     
    override def preStart(): Unit = {
        log.debug("Hello preStart")

        val router = context.actorOf(Props[Greeter].withRouter(FromConfig), "router1")

        router ! Greeter.Greet("alice")
        router ! Greeter.Greet("bob")
        router ! Greeter.Greet("carol")

        log.debug("Bye preStart")
    }
     
    def receive = {
        case Greeter.GreetBack(answer) =>
            log.info("GreetBack: " + answer)
    }
}

object Greeter {
    case class Greet(who: String)
    case class GreetBack(answer: String)
}
     
class Greeter extends Actor with akka.actor.ActorLogging {
    def receive = {
    case Greeter.Greet(who) =>
        log.info("Hello " + who)
        sender ! Greeter.GreetBack("Hello back from " + who)
    }
}
application.conf
akka.actor.deployment {
    /app/myrouter1 {
          router = round-robin
        nr-of-instances = 5
    }
}
ここでround-robinをrandomにかえたり、いろいろ設定変更できます。
> java -cp akka-2.2.3/lib/akka/akka-actor_2.10-2.2.3.jar:akka-2.2.3/lib/akka/config-1.0.2.jar:akka-2.2.3/lib/scala-library.jar:. akka.Main HelloWorld5
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] Hello preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/router1/$a] Hello alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-6] [akka://Main/user/app/router1/$b] Hello bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/router1/$c] Hello carol
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] Bye preStart
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from alice
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from bob
[INFO] ...略... [Main-akka.actor.default-dispatcher-4] [akka://Main/user/app] GreetBack: Hello back from carol
^C

2. Clusterで試す(1台のサーバー上の4ノード) = (D)

akka2.2.3のclusterの話(scala)は、このあたり
http://doc.akka.io/docs/akka/2.2.3/common/cluster.html
http://doc.akka.io/docs/akka/2.2.3/scala/cluster-usage.html です。

以下は、actorを自動生成する例です。上記ドキュメント(後者)上は、Router with Remote Deployed Routeesの名前になっています。 すでに別途作成したactorをrouterに関連づける方法はRouter Example with Lookup of Routeesです。バージョンがかわると、呼び方もあっさり変わるみたいです。

2-0. SBTを入れる

SBT無しだと、一部うまく動かなかったので。

SBTのrpmをここ(http://www.scala-sbt.org/release/docs/Getting-Started/Setup.htmlの「RPM package」)からダウンロード
(http://repo.scala-sbt.org/scalasbt/sbt-native-packages/org/scala-sbt/sbt/0.13.1/sbt.rpm)
> rpm -ivh sbt.rpm 
準備中...                ########################################### [100%]
   1:sbt                    ########################################### [100%]
> sbt version
...(省略: 一度目だけ時間がかかります)...
[info] Set current project to root (in build file:/root/)
[info] 0.1-SNAPSHOT

2-1. 4つのノード上の自動生成のactorに、メッセージを送る。 

2-1-1. SBTのしきたりにあわせてファイルを準備する。

ここでは~/work2ディレクトリで。
> mkdir ~/work2
> cd ~/work2
> mkdir -p src/main/scala
> mkdir -p src/main/resources

三つのファイルを準備。SBT上のプロジェクト設定ファイルbuild.sbtと、scalaのプログラムファイルHelloCluster.scala、akkaの設定ファイルapplication.confで、それぞれの場所に配置します。

・build.sbt
name := "Hello Cluster Project"

version := "1.0"
     
scalaVersion := "2.10.3"
     
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
     
libraryDependencies +=
    "com.typesafe.akka" %% "akka-actor" % "2.2.3"

libraryDependencies +=
    "com.typesafe.akka" %% "akka-cluster" % "2.2.3"

・src/main/scala/HelloCluster.scala
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
 
object HelloCluster {
    def main(args: Array[String]): Unit = {
        if (args.nonEmpty){
            System.setProperty("akka.remote.netty.tcp.hostname", args(0))
            System.setProperty("akka.remote.netty.tcp.port", args(1))
        }
 
        val system = ActorSystem("ClusterSystem")
        val clusterListener = system.actorOf(Props[HelloClusterListener],
          name = "clusterListener")
 
        Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])

        if(args.length > 2){
            Thread.sleep(3000)

            val router = system.actorOf(Props[Greeter].withRouter(
                ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
                totalInstances = 100, maxInstancesPerNode = 3,
                allowLocalRoutees = false, useRole = None))),
                name = "clusterrouter")

            router.tell(ConsistentHashableEnvelope(Greeter.Greet("alice"), "alice"), clusterListener)
            router.tell(ConsistentHashableEnvelope(Greeter.Greet("bob"), "bob"), clusterListener)
            router.tell(ConsistentHashableEnvelope(Greeter.Greet("carol"), "carol"), clusterListener)
        }
    }
}
 
class HelloClusterListener extends Actor with ActorLogging {
    def receive = {
        case state: CurrentClusterState =>
            log.info("Current members: {}", state.members.mkString(", "))
        case MemberUp(member) =>
            log.info("Member is Up: {}", member.address)
        case UnreachableMember(member) =>
            log.info("Member detected as unreachable: {}", member)
        case MemberRemoved(member, previousStatus) =>
            log.info("Member is Removed: {} after {}",
            member.address, previousStatus)
        case Greeter.GreetBack(answer) =>
            log.info("GreetBack: " + answer)
        case _: ClusterDomainEvent => // ignore
    }
}

object Greeter {
    case class Greet(who: String)
    case class GreetBack(answer: String)
}
     
class Greeter extends Actor with akka.actor.ActorLogging {
    def receive = {
    case Greeter.Greet(who) =>
        log.info("Hello " + who)
        sender ! Greeter.GreetBack("Hello back from " + who)
    }
}

・src/main/resources/application.conf
akka {
    actor {
        provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
        log-remote-lifecycle-events = off
        netty.tcp {
            hostname = "127.0.0.1"
            port = 0
        }
    }
     
    cluster {
        seed-nodes = [
            "akka.tcp://ClusterSystem@127.0.0.1:2551",
            "akka.tcp://ClusterSystem@127.0.0.1:2552"]
     
        auto-down = on
    }
}

2-1-2. 4つのノードを実行。

一つのサーバー上で4ノード実行させます。ポート違いで、127.0.0.1:2551, 127.0.0.1:2552, 127.0.0.1:2553, 127.0.0.1:2554の四つです。(このうち2551と2552は、クラスタの中の特別なノードで、application.confファイルのseed-nodesとして設定されています。)

作業は、4つのターミナルを開いて、それぞれ、~/work2ディレクトリで、 実行します。

ターミナル1 (seedノードの1つ目。port=2551は、confファイルのseed-nodesにて設定。)
sbt "run-main HelloCluster 127.0.0.1 2551"

ターミナル2 (seedノードの2つ目。port=2552は、confファイルのseed-nodesにて設定。)
sbt "run-main HelloCluster 127.0.0.1 2552"

ターミナル3 (普通のノード。portは2551,2552以外ならなんでもOK。同様にノードを増やせます。)
sbt "run-main HelloCluster 127.0.0.1 2553"

ここまでで、3ノードクラスタが動作して、それぞれで、おおむね以下の表示で、確認できます。
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2553

ターミナル4
ここでは、ついでに、routerを作って、messageを投入。
sbt "run-main HelloCluster 127.0.0.1 2554 go"
すると、以下が、どこかのノードに表示され、
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-13] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c4] Hello alice
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c1] Hello bob
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/remote/akka.tcp/ClusterSystem@127.0.0.1:2554/user/clusterrouter/c9] Hello carol
もとのノード(ターミナル4)に、以下が表示されます。
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from alice
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-13] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from bob
[INFO] ...略... [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/user/clusterListener] GreetBack: Hello back from carol

3.Clusterで試す(4台のサーバー上の4ノード)

3-1. プログラムをjarパッケージ化して他のサーバへ配布


SBTプロジェクトにsbt-assemblyを設定
sbt-assemblyは、https://github.com/sbt/sbt-assembly

・project/assembly.sbt として以下の内容のファイルを作成
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")

・build.bstに以下を追加(import行は、ファイルの先頭に)
import AssemblyKeys._

assemblySettings


jarパッケージを作成

ここで、4サーバーは、10.0.0.1:2551, 10.0.0.2:2551, 10.0.0.3:2551, 10.0.0.4:2551として、 seed-nodeを10.0.0.1:2551と10.0.0.2:2551とします。

・src/main/resources/application.confの以下の部分を変更します。
    cluster {
        seed-nodes = [
            "akka.tcp://ClusterSystem@10.0.0.1:2551",
            "akka.tcp://ClusterSystem@10.0.0.2:2551"]
     
        auto-down = on
    }

パッケージを作成します。
sbt package

できあがりは、 target/scala-2.10/hello-cluster-project_2.10-1.0.jar ファイルです。
これを、各サーバにコピーします。各サーバーでは、javaのみ必要です。

3-2. 各サーバ上で、ノードを実行

一つ目(seedノード)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.1 2551

二つ目(seedノード)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.2 2551

三つ目(普通のノード。同様に、いくつでも増やせます。)
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.3 2551

四つ目で、ついでに、routerを作って、messageを投入。
java -cp hello-cluster-project_2.10-1.0.jar HelloCluster 10.0.0.4 2551 go

結果は、2と同じです。

--
以上

0 件のコメント:

コメントを投稿