
前言:akka是一种基于Actor 模型,提供了一个在 JVM 上构建高并发、分布式和高容错应用程序的平台。框架资料较少,主要参考资料:akka官网文档:https://doc.akka.io/docs/akka/current/actors.html netty作为 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架。

object Client {
    @JvmStatic
    fun main(args: Array<String>) {
        val bootstrap = Bootstrap()
        bootstrap
            .group(NioEventLoopGroup())
            .channel(NioSocketChannel::class.java) //注意客户端与服务端在这里的区别
            .handler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(channel: SocketChannel) {
                    channel.pipeline()
                        .addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                        // 添加LengthFieldPrepender来处理包长度信息
                        .addLast(LengthFieldPrepender(4))
                        .addLast(StringDecoder())
                        .addLast(StringEncoder())
                        .addLast(object : ChannelInboundHandlerAdapter(){
                            override fun channelActive(ctx: ChannelHandlerContext) {
                                println("与服务端链接已建立")
                                println("-------------------------------------------")
                                print("发送聊天消息(如果需要私聊用户则使用@用户名):")
                            }
                            override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
                                val currentTime = LocalDateTime.now()
                                val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
                                val formattedTime = currentTime.format(formatter)
                                print("\r")  // 回车符,回到行首
                                print("                                       \r")  // 清空当前行
                                println("---------------$formattedTime---------------")
                                println((msg as String).trim())
                                print("发送聊天消息(如果需要私聊用户则使用@用户名):")  // 重新显示输入提示
                            }
                        })
                }
            })
            .option(ChannelOption.SO_KEEPALIVE, true) //设置长连接
        val channel = bootstrap.connect("localhost", 8080).sync().channel()
        /**
         * 如果私聊用户则需要在发送消息之前加上:@用户名
         * */
        val messageThread = Thread{
            val scanner = Scanner(System.`in`)  //模拟用户输入数据进行发送
            try {
                while (scanner.hasNextLine()) {
                    print("发送聊天消息(如果需要私聊用户则使用@用户名):")
                    val message = scanner.nextLine()
                    channel.writeAndFlush(message).sync()
                }
            }catch (e:InterruptedException){
                e.printStackTrace()
            }finally {
                scanner.close()
            }
        }
        messageThread.start()
        channel.closeFuture().sync()
    }
}
object Server {
    @JvmStatic
    fun main(args: Array<String>) {
        val actorSystem = ActorSystem.create("ChatServerKt")
        val roomActor = actorSystem.actorOf(Props.create(RoomActor::class.java), "RoomActor")
        var UserList:MutableList<Channel> = mutableListOf() //保存创建的用户
        val bossGroup: EventLoopGroup = NioEventLoopGroup()
        val workerGroup: EventLoopGroup = NioEventLoopGroup()
        val bootstrap = ServerBootstrap()
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel::class.java)
            .childHandler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(channel: SocketChannel) {
       //             channel.pipeline().addLast("IdleStateHandler", IdleStateHandler(0,10,0, TimeUnit.SECONDS));
                    channel.pipeline()
    //                    .addLast(LineBasedFrameDecoder(8192)) // 添加行解码器
                        .addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                        // 添加LengthFieldPrepender来处理包长度信息
                        .addLast(LengthFieldPrepender(4))
                        .addLast(StringDecoder())
                        .addLast(StringEncoder())
                        .addLast(object : ChannelInboundHandlerAdapter(){
                        override fun channelActive(ctx: ChannelHandlerContext) {
                            UserList.add(ctx.channel())
                            val num:Int = UserList.indexOf(ctx.channel())
                            val user = User("user${UserList.indexOf(ctx.channel())}",ctx.channel())
                            roomActor.tell(user, ActorRef.noSender())  //创建用户
                            println("user${num}连接")
                            println("当前用户数量为:${UserList.size}")
                        }
                        override fun channelInactive(ctx: ChannelHandlerContext) {
                            println("user${UserList.indexOf(ctx.channel())}退出")
                            UserList.remove(ctx.channel())
                        }
                        override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
                            val msgstr = (msg as String).trim()
                            if(msgstr.startsWith("@")){ //@user1你好
                                val tonum = msgstr[5]-'0'
                                val fromnum:Int = UserList.indexOf(ctx.channel())
                                if(tonum==fromnum){
                                    println("不能向自己发送消息!")
                                }else{
                                    if(tonum > UserList.size){
                                        println("目标用户不存在!")
                                    }else{
                                        val tomsg:String = msgstr.substring(6)
                                        val toMsg = ToUserMsg("user$fromnum","user$tonum",tomsg,UserList[tonum])
                                        roomActor.tell(toMsg, ActorRef.noSender())
                                    }
                                }
                            }else{
                                println("服务端收到消息:$msgstr")
                                val num:Int = UserList.indexOf(ctx.channel())
                                val message = MsgEntity("user${num}",msg as String,ctx.channel())
                                roomActor.tell(message, ActorRef.noSender()) //向Router
                            }
                        }
                    })
                }
            })
            .childOption(ChannelOption.SO_KEEPALIVE, true)
        bootstrap.bind(8080)
    }
}class UserActor : AbstractPersistentActor() {
    private val log: LoggingAdapter = Logging.getLogger(context.system, this)
//    private val channelGroup: ChannelGroup = DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
    private val mediator = DistributedPubSub.get(context.system).mediator()
    override fun persistenceId(): String {
        return "User-" + self.path().name()
    }
    override fun preStart() {
        mediator.tell(DistributedPubSubMediator.Subscribe("PublicMsg", self), self)
        mediator.tell(DistributedPubSubMediator.Subscribe(self.path().name(), self), self)
        //mediator.tell(DistributedPubSubMediator.Put(getSelf()), getSelf());
        println("节点启动")
        println("Actor路径: ${self.path()}")
        println("Actor名称: ${self.path().name()}")
    }
    override fun createReceiveRecover(): Receive {
        return receiveBuilder()
            .match(PublicMsg::class.java){
                println("UserActor重启成功")
            }.build()
    }
    fun getTime():String{
        val currentTime = LocalDateTime.now()
        val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
        val formattedTime = currentTime.format(formatter)
        return formattedTime
    }
    override fun createReceive(): Receive {
        return receiveBuilder()
            .match(PreWarmMessage::class.java){
                _->
                println("已经可以正常发送消息!")
            }
            .match(PublicMsg::class.java) { msg ->
                println("---------------聊天室消息-${getTime()}---------------")
                println("用户 ${msg.userId} : ${msg.msg}")
            }
            .match(PrivateMsg::class.java) { msg ->
                println("---------------私聊消息-${getTime()}---------------")
                println("来自用户 ${msg.fromUser}的私聊消息 : ${msg.msg}")
            }
            .match(DistributedPubSubMediator.SubscribeAck::class.java) {
                _->
                println("收到订阅消息")
            }
            .build()
    }
    companion object {
        fun props(): Props {
            return Props.create(UserActor::class.java)
        }
    }
}class ShardExtractor:ShardRegion.MessageExtractor {
    override fun entityId(message: Any?): String {
        return when (message) {
            is PreWarmMessage -> message.shardId+""
            is PrivateMsg -> message.toUser!!+""
       //     is PublicMsg -> message.userId.toString() + ""
            is ShardRegion.StartEntity -> message.entityId()
            is DistributedPubSubMediator.SubscribeAck -> "subscribe-ack-entity" // 添加处理 SubscribeAck 的逻辑
            else -> throw RuntimeException("无法识别消息类型 $message")
        }
    }
    override fun shardId(message: Any?): String {
        return when (message) {
            is PreWarmMessage -> (message.shardId.toString().hashCode() % 10).toString() + ""
            is PrivateMsg -> (message.toUser!!.toString().hashCode() % 10).toString() + ""
    //        is PublicMsg -> (message.userId.toString().hashCode() % 10).toString() + ""
            is ShardRegion.StartEntity -> (message.entityId().hashCode() % 10).toString()
            is DistributedPubSubMediator.SubscribeAck -> "0"
            else -> throw RuntimeException("无法识别消息类型 $message")
        }
    }
    override fun entityMessage(message: Any?): Any {
        return message!!
    }
}
akka {
  actor {
    provider = "cluster"
    allow-java-serialization = on
  }
  remote {
    artery {
      enabled = on
      transport = tcp
      canonical.hostname = "127.0.0.1"
      canonical.port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551"
      "akka://ClusterSystem@127.0.0.1:2552"
    ]
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
  cluster.sharding {
    remember-entities = on
    state-store-mode = persistence
  }
    persistence {
      journal.plugin = "akka.persistence.journal.inmem"  # 使用内存中的持久化插件,只适用于测试
      snapshot-store.plugin = "akka.persistence.snapshot-store.local"  # 使用本地文件系统快照存储
      snapshot-store.local.dir = "target/snapshots/node1"  # 快照存储路径
    }
}data class PreWarmMessage(val shardId: String) : Serializable原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。