Kotlin websockets 陷入 TIME_WAIT 状态

发布于 2025-01-10 01:31:48 字数 2927 浏览 0 评论 0原文

在尝试创建大量 Websocket 时,我收到 java.net.BindException。我认为这是因为网络套接字没有正确关闭。

如何在 kotlin 中正确关闭 websocket?

单个文件中的示例代码:

import io.ktor.application.*
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.features.websocket.*
import io.ktor.http.*
import io.ktor.http.cio.websocket.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore

const val port = 8080
const val WEBSOCKET_ROUTE = "/ws"
const val TOTAL_REQUESTS = 100000
const val CONCURRENT_REQUESTS = 100

fun main(args: Array<String>) = runBlocking {
    val embeddedServer = embeddedServer(Netty, port = port, module = modules)
    embeddedServer.start()

    runClients()

    embeddedServer.stop(0, 0)
}

private suspend fun runClients() = coroutineScope {

    val client = HttpClient(CIO) {
        install(io.ktor.client.features.websocket.WebSockets)
    }

    val semaphore = Semaphore(CONCURRENT_REQUESTS)
    val clientJobs = List(TOTAL_REQUESTS) {
        semaphore.acquire()
        launch {
            makeClientWebSocketRequest(client, it)
            semaphore.release()
        }
    }

    clientJobs.joinAll()
}

private suspend fun makeClientWebSocketRequest(client: HttpClient, id: Int) {
    client.webSocket(method = HttpMethod.Get, "localhost", port, WEBSOCKET_ROUTE) {
        try {
            send(Frame.Text("ehlo $id"))
            val data = (incoming.receive() as Frame.Text).readText()
            println("client: $data")
        } finally {
            close()
        }
    }
}

val modules = fun Application.() {
    install(io.ktor.websocket.WebSockets)

    routing {
        webSocket(WEBSOCKET_ROUTE) {
            handleWebSocketServer(this)
        }
    }
}

private suspend fun handleWebSocketServer(websocket: DefaultWebSocketServerSession) {
    try {
        val data = (websocket.incoming.receive() as Frame.Text).readText()
        println("server: $data")
        websocket.outgoing.send(Frame.Text(data.reversed()))
    } finally {
        websocket.close()
    }
}

异常:

Exception in thread "main" java.net.BindException: Address already in use: no further information

netstat 输出:

PS C:\Users\partkyle> netstat

Active Connections

  Proto  Local Address          Foreign Address        State
  TCP    127.0.0.1:8080         kubernetes:55278       TIME_WAIT
  TCP    127.0.0.1:49152        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49153        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49154        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49155        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49156        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49157        kubernetes:8080        TIME_WAIT

......

While trying to create a large number of websockets I get a java.net.BindException. I think this is because the websockets are not being properly closed.

How do I properly close a websocket in kotlin?

Example code in a single file:

import io.ktor.application.*
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.features.websocket.*
import io.ktor.http.*
import io.ktor.http.cio.websocket.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore

const val port = 8080
const val WEBSOCKET_ROUTE = "/ws"
const val TOTAL_REQUESTS = 100000
const val CONCURRENT_REQUESTS = 100

fun main(args: Array<String>) = runBlocking {
    val embeddedServer = embeddedServer(Netty, port = port, module = modules)
    embeddedServer.start()

    runClients()

    embeddedServer.stop(0, 0)
}

private suspend fun runClients() = coroutineScope {

    val client = HttpClient(CIO) {
        install(io.ktor.client.features.websocket.WebSockets)
    }

    val semaphore = Semaphore(CONCURRENT_REQUESTS)
    val clientJobs = List(TOTAL_REQUESTS) {
        semaphore.acquire()
        launch {
            makeClientWebSocketRequest(client, it)
            semaphore.release()
        }
    }

    clientJobs.joinAll()
}

private suspend fun makeClientWebSocketRequest(client: HttpClient, id: Int) {
    client.webSocket(method = HttpMethod.Get, "localhost", port, WEBSOCKET_ROUTE) {
        try {
            send(Frame.Text("ehlo $id"))
            val data = (incoming.receive() as Frame.Text).readText()
            println("client: $data")
        } finally {
            close()
        }
    }
}

val modules = fun Application.() {
    install(io.ktor.websocket.WebSockets)

    routing {
        webSocket(WEBSOCKET_ROUTE) {
            handleWebSocketServer(this)
        }
    }
}

private suspend fun handleWebSocketServer(websocket: DefaultWebSocketServerSession) {
    try {
        val data = (websocket.incoming.receive() as Frame.Text).readText()
        println("server: $data")
        websocket.outgoing.send(Frame.Text(data.reversed()))
    } finally {
        websocket.close()
    }
}

Exception:

Exception in thread "main" java.net.BindException: Address already in use: no further information

netstat output:

PS C:\Users\partkyle> netstat

Active Connections

  Proto  Local Address          Foreign Address        State
  TCP    127.0.0.1:8080         kubernetes:55278       TIME_WAIT
  TCP    127.0.0.1:49152        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49153        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49154        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49155        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49156        kubernetes:8080        TIME_WAIT
  TCP    127.0.0.1:49157        kubernetes:8080        TIME_WAIT

......

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文