我不是罗大锤我不是罗大锤

我不是罗大锤

我不是罗大锤我不是罗大锤

我不是罗大锤

首页首页
分类分类
标签标签
友情链接友情
日记日记
Ktor + Next.js 实现在线人数显示Ktor + Next.js 实现在线人数显示

Ktor + Next.js 实现在线人数显示

&后端#Ktor

允许评论

2 个月前

目前 Nola 博客已经实现了此功能,在当前页面的页脚中即可看到。

在线人数

本文记录一下使用 Ktor 后端 + Next.js 通过 WebSocket 实现前端页面在线人数的展示。

一、Ktor

1. 导入依赖

首先在 Ktor 中,导入 WebSocket 依赖:

// WebSocket
implementation("io.ktor:ktor-server-websockets:$ktor_version")

2. 启用 WebSocket 插件

新建一个 WebSocketPlugin.kt 文件,并写入下面的代码,配置 WebSocket 插件:

import io.ktor.server.application.*
import io.ktor.server.websocket.*
import kotlin.time.Duration.Companion.seconds

fun Application.configureWebSocket() {
    install(WebSockets) {
        // ping 之间的持续时间
        pingPeriod = 30.seconds

        // 超时时间
        timeout = 15.seconds

        // 帧最大大小
        maxFrameSize = Long.MAX_VALUE

        // 是否使用掩码
        masking = false
    }
}

然后在 Ktor 应用的 Application 上下文中启用该插件即可:

fun Application.module() {
    // ... 其他插件配置

    // WebSocket 配置
    configureWebSocket()
}

3. 编写 Manager 类

新建一个 BlogOnlineManager 类,用于管理 WebSocket 的连接和记录当前在线人数。

同时限制了每个 IP 地址最多只能同时建立 5 个连接,第 6 个连接会被直接关闭,这里可以根据自己的需求进行修改。

import cc.loac.extensions.toJSONString
import io.ktor.websocket.WebSocketSession
import io.ktor.websocket.send
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

/**
 * 博客前端在线人数数量管理
 */
class BlogOnlineManager private constructor() {

    companion object {
        @Volatile
        private var instance: BlogOnlineManager? = null

        private var ioScope = CoroutineScope(Dispatchers.IO)

        // 定时任务
        private var job: Job? = null

        /**
         * 获取 Manager 实例
         * @return 唯一 BlogOnlineManager 实例
         */
        fun getInstance(): BlogOnlineManager {
            return instance ?: synchronized(this) {
                instance ?: BlogOnlineManager().also {
                    instance = it


                    if (job == null) {
                        // 启动定时任务
                        job = ioScope.launch {
                            // 每 30 分钟尝试清理一次无效连接
                            while (true) {
                                delay(1000 * 60 * 30)
                                instance?.cleanupDeadConnections()
                            }
                        }
                    }
                }
            }
        }
    }

    private val connectionMutex = Mutex()

    // 当前连接的 Session
    private val connections = ConcurrentHashMap<String, WebSocketSession>()

    // 在线人数
    private val onlineCount = AtomicInteger(0)

    // IP 连接计数,防止单个 IP 过多连接
    private val ipConnectionCount = ConcurrentHashMap<String, AtomicInteger>()

    // 每个 IP 最多只计算 5 个设备
    private val maxConnectionsPerIp = 5

    /**
     * 添加连接
     * @param sessionId Session ID
     * @param session WebSocketSession
     * @param clientIp 客户端 IP
     * @return 是否添加成功(如果当前 IP 数量超过 5 个,则认为失败)
     */
    suspend fun addConnection(
        sessionId: String,
        session: WebSocketSession,
        clientIp: String
    ): Boolean {
        return connectionMutex.withLock {
            // 检查单个 IP 连接数限制
            val ipCount = ipConnectionCount.computeIfAbsent(clientIp) { AtomicInteger(0) }
            if (ipCount.get() >= maxConnectionsPerIp) {
                return@withLock false
            }

            connections[sessionId] = session
            ipCount.incrementAndGet()
            val currentCount = onlineCount.incrementAndGet()

            // 广播新的在线人数
            broadcastOnlineCount(currentCount)
            true
        }
    }

    /**
     * 移除连接
     * @param sessionId Session ID
     * @param clientIp 客户端 IP
     */
    suspend fun removeConnection(sessionId: String, clientIp: String) {
        return connectionMutex.withLock {
            connections.remove(sessionId)
            ipConnectionCount[clientIp]?.decrementAndGet()
            val currentCount = onlineCount.decrementAndGet()

            // 广播新的在线人数
            broadcastOnlineCount(currentCount)
        }
    }

    /**
     * 广播在线人数
     * @param count 在线人数
     */
    private suspend fun broadcastOnlineCount(count: Int) {
        val message = OnlineCount(
            count = count,
            timestamp = System.currentTimeMillis()
        ).toJSONString()

        val deadConnections = mutableListOf<String>()

        connections.forEach { (sessionId, session) ->
            try {
                session.send(message)
            } catch (_: Exception) {
                // 记录死连接,稍后清理
                deadConnections.add(sessionId)
            }
        }

        // 清理死连接
        deadConnections.forEach { sessionId ->
            connections.remove(sessionId)
            onlineCount.decrementAndGet()
        }
    }

    /**
     * 获取当前连接数量
     */
    fun getCurrentCount(): Int = onlineCount.get()

    /**
     * 清理无效连接
     */
    suspend fun cleanupDeadConnections() {
        connectionMutex.withLock {
            val deadConnections = mutableListOf<String>()

            connections.forEach { (sessionId, session) ->
                try {
                    // 发送心跳检测
                    session.send("ping")
                } catch (_: Exception) {
                    deadConnections.add(sessionId)
                }
            }

            deadConnections.forEach { sessionId ->
                connections.remove(sessionId)
                onlineCount.decrementAndGet()
            }
        }
    }
}

/**
 * 在线人数响应类
 * @param count 人数
 * @param timestamp 时间戳毫秒
 */
data class OnlineCount(
    val count: Int,
    val timestamp: Long
)

4. 编写路由

在路由中新建 WebSocket 路由,代码如下所示:

webSocket("online") {
  	// 每个连接的唯一 UUID
    val sessionId = UUID.randomUUID().toString()
    // 客户端 IP
    val clientIp = call.request.origin.remoteHost
  	// 获取 Manager 实例
    val manager = BlogOnlineManager.getInstance()

    // 验证并发起连接
    if (!manager.addConnection(sessionId, this, clientIp)) {
        close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "来自 $clientIp 的连接太多"))
        return@webSocket
    }

    try {
        // 处理客户端消息
        for (frame in incoming) {
            // 这里只处理了客户端的心跳消息,其他任何来自客户端的消息均忽略
            when (frame) {
                is Frame.Text -> {
                    // 回复客户端心跳消息
                    val text = frame.readText()
                    if (text == "ping") {
                        send(Frame.Text("pong"))
                    }
                }

                is Frame.Close -> {
                    break
                }

                else -> {}
            }
        }

    } catch (_: ClosedReceiveChannelException) {
        // 连接正常关闭
    } catch (e: Exception) {
        // .error() 是一个日志打印扩展函数,用于打印错误日志
        e.localizedMessage.error()
    } finally {
        // 客户端断开和发生错误时移除 WebSocket 连接
        manager.removeConnection(sessionId, clientIp)
    }
}

5. (可选)限流器

上面已经实现了 WebSocket 计数的功能,如果想要实现对该 WebSocket 接口请求速度进行限制,可以使用 Ktor 的一个官方插件 Rate Limit。

首先导入 Rate Limit 依赖:

// 接口访问速率限制
implementation("io.ktor:ktor-server-rate-limit:$ktor_version")

新建一个RateLimitPlugin.kt 文件用于配置 Rate Limit:

import io.ktor.server.application.*
import io.ktor.server.plugins.origin
import io.ktor.server.plugins.ratelimit.*
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

/** 在线人数 WebSocket 限流器 **/
val LIMITER_ONLINE_COUNT_WB = RateLimitName("LIMITER_ONLINE_COUNT_WB")

/**
 * 配置接口速率限制插件
 */
fun Application.configureRateLimit() {
    install(RateLimit) {
        // 全局限制器
        global {
            rateLimiter(limit = 300, refillPeriod = 60.seconds)
        }

        // 在线人数 WebSocket 限流器
        register(LIMITER_ONLINE_COUNT_WB) {
            // 每分钟内最多请求 10 次
            rateLimiter(limit = 10, refillPeriod = 1.minutes)
            // 使用客户端 IP 作为 Key
            requestKey { call ->
                call.request.origin.remoteHost
            }
        }
    }
}

接着在 Ktor 应用的 Application 上下文中启用插件:

fun Application.module() {
    // ... 其他插件配置

    // WebSocket 配置
    configureWebSocket()
    
    // 接口访问速率限制配置
    configureRateLimit()
}

随后在 WebSocket 的路由外套上 rateLimit()方法,即可实现对 WebSocket 接口的限流:

// 限流器,LIMITER_ONLINE_COUNT_WB 是上面在 RateLimit 插件中定义的规则
rateLimit(LIMITER_ONLINE_COUNT_WB) {
    webSocket("online") {
    	// WebSocket 路由体
    }
}

二、Next.js

1. 定义 Hook

在 Next.js 中,使用一个 Hook 来处理 WebSocket 连接和响应。

新建一个 useOnlineCount.ts 方法:

import { useEffect, useRef, useState } from 'react';
import { OnlineCountResponse } from '@/models/response/OnlineCountResponse';

/**
 * 获取当前博客在线人数 Hook
 */
export const useOnlineCount = () => {
    
  // 当前人数
  const [onlineCount, setOnlineCount] = useState<number>(0);
    
  // 更新时间戳
  const [updateTime, setUpdateTime] = useState<number>(0);
    
  // WebSocket Ref
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    // 创建 WebSocket 连接

    // WebSocket 服务器地址(ws:// 或者 wss:// 开头,取决于是否启用了 HTTPS)
    // 例如:ws://localhost:8098, wss://loac.cc
    const serverUrl = process.env.NEXT_PUBLIC_WEBSOCKET_URL;
    if (!serverUrl) {
      setOnlineCount(-1)
      setUpdateTime(-1)
      console.error('WEBSOCKET_URL is not defined');
      return;
    }
      
    // 上面在 Ktor 中定义的 WebSocket 路由地址
    const wsUrl = `${serverUrl}/online`;

    // 创建 WebSocket 连接
    const ws = new WebSocket(wsUrl);
    wsRef.current = ws;

    // 监听服务器消息
    ws.onmessage = (event) => {
      try {
        if (String(event.data) == 'ping') {
          // 服务器向客户端发送 ping,无需处理
        } else {
          // 收到在线人数数据
          const data: OnlineCountResponse = JSON.parse(event.data);
          setOnlineCount(data.count);
          setUpdateTime(data.timestamp)
        }
      } catch (error) {
        setOnlineCount(-1)
        setUpdateTime(-1)
        console.error(`WebSocket 解析消息失败:${error}`);
      }
    };

    ws.onopen = () => {
      // 连接成功
    }

    ws.onclose = () => {
      console.log('WebSocket 连接已关闭');
      setOnlineCount(-1)
      setUpdateTime(-1)
    }

    ws.onerror = (error) => {
      console.error(`WebSocket 错误:${error}`);
      setOnlineCount(-1)
      setUpdateTime(-1)
    }

    // 页面卸载时关闭连接
    const handleBeforeUnload = () => {
      ws.close();
    }
    window.addEventListener('beforeunload', handleBeforeUnload)

    return () => {
      ws.close();
      window.removeEventListener('beforeunload', handleBeforeUnload)
    }
  }, []);
  return onlineCount
};

2. 使用 Hook

Hook 定义完成后,就可以在页面上进行展示了,下面是一个例子:

'use client';
import { useOnlineCount } from '@/hooks/useOnlineCount';

/**
 * 博客在线人数显示组件
 */
export default function OnlineCounter() {
  const { onlineCount, updateTime } = useOnlineCount();
  return (
    <div>
      <p>当前在线人数:{onlineCount} 人</p>
      {/*formatDate() 是一个将时间戳毫秒转换为字符串的方法*/}
      <p>更新时间:{updateTime > 0 ? formatDate(updateTime) : '未知'}</p>
    </div>
  );
}
目录
一、Ktor
1. 导入依赖
2. 启用 WebSocket 插件
3. 编写 Manager 类
4. 编写路由
5. (可选)限流器
二、Next.js
1. 定义 Hook
2. 使用 Hook
暂无评论

在线人数:0 人

文章总浏览量:21167

Powered byNola