Kotlin websockets 陷入 TIME_WAIT 状态
在尝试创建大量 Websocket 时,我收到 java.net.BindException。我认为这是因为网络套接字没有正确关闭。
如何在 kotlin 中正确关闭 websocket?
单个文件中的示例代码:
import io.ktor.application.*
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.features.websocket.*
import io.ktor.http.*
import io.ktor.http.cio.websocket.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore
const val port = 8080
const val WEBSOCKET_ROUTE = "/ws"
const val TOTAL_REQUESTS = 100000
const val CONCURRENT_REQUESTS = 100
fun main(args: Array<String>) = runBlocking {
val embeddedServer = embeddedServer(Netty, port = port, module = modules)
embeddedServer.start()
runClients()
embeddedServer.stop(0, 0)
}
private suspend fun runClients() = coroutineScope {
val client = HttpClient(CIO) {
install(io.ktor.client.features.websocket.WebSockets)
}
val semaphore = Semaphore(CONCURRENT_REQUESTS)
val clientJobs = List(TOTAL_REQUESTS) {
semaphore.acquire()
launch {
makeClientWebSocketRequest(client, it)
semaphore.release()
}
}
clientJobs.joinAll()
}
private suspend fun makeClientWebSocketRequest(client: HttpClient, id: Int) {
client.webSocket(method = HttpMethod.Get, "localhost", port, WEBSOCKET_ROUTE) {
try {
send(Frame.Text("ehlo $id"))
val data = (incoming.receive() as Frame.Text).readText()
println("client: $data")
} finally {
close()
}
}
}
val modules = fun Application.() {
install(io.ktor.websocket.WebSockets)
routing {
webSocket(WEBSOCKET_ROUTE) {
handleWebSocketServer(this)
}
}
}
private suspend fun handleWebSocketServer(websocket: DefaultWebSocketServerSession) {
try {
val data = (websocket.incoming.receive() as Frame.Text).readText()
println("server: $data")
websocket.outgoing.send(Frame.Text(data.reversed()))
} finally {
websocket.close()
}
}
异常:
Exception in thread "main" java.net.BindException: Address already in use: no further information
netstat 输出:
PS C:\Users\partkyle> netstat
Active Connections
Proto Local Address Foreign Address State
TCP 127.0.0.1:8080 kubernetes:55278 TIME_WAIT
TCP 127.0.0.1:49152 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49153 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49154 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49155 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49156 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49157 kubernetes:8080 TIME_WAIT
......
While trying to create a large number of websockets I get a java.net.BindException. I think this is because the websockets are not being properly closed.
How do I properly close a websocket in kotlin?
Example code in a single file:
import io.ktor.application.*
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.features.websocket.*
import io.ktor.http.*
import io.ktor.http.cio.websocket.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore
const val port = 8080
const val WEBSOCKET_ROUTE = "/ws"
const val TOTAL_REQUESTS = 100000
const val CONCURRENT_REQUESTS = 100
fun main(args: Array<String>) = runBlocking {
val embeddedServer = embeddedServer(Netty, port = port, module = modules)
embeddedServer.start()
runClients()
embeddedServer.stop(0, 0)
}
private suspend fun runClients() = coroutineScope {
val client = HttpClient(CIO) {
install(io.ktor.client.features.websocket.WebSockets)
}
val semaphore = Semaphore(CONCURRENT_REQUESTS)
val clientJobs = List(TOTAL_REQUESTS) {
semaphore.acquire()
launch {
makeClientWebSocketRequest(client, it)
semaphore.release()
}
}
clientJobs.joinAll()
}
private suspend fun makeClientWebSocketRequest(client: HttpClient, id: Int) {
client.webSocket(method = HttpMethod.Get, "localhost", port, WEBSOCKET_ROUTE) {
try {
send(Frame.Text("ehlo $id"))
val data = (incoming.receive() as Frame.Text).readText()
println("client: $data")
} finally {
close()
}
}
}
val modules = fun Application.() {
install(io.ktor.websocket.WebSockets)
routing {
webSocket(WEBSOCKET_ROUTE) {
handleWebSocketServer(this)
}
}
}
private suspend fun handleWebSocketServer(websocket: DefaultWebSocketServerSession) {
try {
val data = (websocket.incoming.receive() as Frame.Text).readText()
println("server: $data")
websocket.outgoing.send(Frame.Text(data.reversed()))
} finally {
websocket.close()
}
}
Exception:
Exception in thread "main" java.net.BindException: Address already in use: no further information
netstat output:
PS C:\Users\partkyle> netstat
Active Connections
Proto Local Address Foreign Address State
TCP 127.0.0.1:8080 kubernetes:55278 TIME_WAIT
TCP 127.0.0.1:49152 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49153 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49154 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49155 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49156 kubernetes:8080 TIME_WAIT
TCP 127.0.0.1:49157 kubernetes:8080 TIME_WAIT
......
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论