Thrift 的线程安全程度如何?回复:我的请求似乎互相干扰
编辑
显然我希望做的事情超出了 thrift 的范围...如果我确保端口上永远不会有超过一个客户端,则一切正常。当然,这种做法违背了目的,因为我希望向服务器打开几个可重用的连接,以缩短响应时间并降低开销。
如果有人提出实现此目标的替代方法的建议,我们将不胜感激(或者如果我的结论是错误的)
背景
我有一个多组件应用程序,主要通过 thrift 连接(主要是 java->php 连接)。
到目前为止,一切似乎都很好,但引入了 Java->Java 连接,其中客户端是一个每秒可以启动数百个请求的 servlet。
正在访问的方法具有以下接口:
bool pvCheck(1:i32 toolId) throws(1:DPNoToolException nte),
为了确保服务端不会出现奇怪的情况,我用一个简单的实现替换了实现:
@Override
public boolean pvCheck(int toolId) throws TException {
//boolean ret = api.getViewsAndDec(toolId);
return true;
}
错误/可能的原因?
只要连接不多,它就可以正常工作,但一旦连接靠得很近,连接就会开始卡在读卡器中。
如果我在调试器中拉出其中一个,堆栈看起来像这样:
Daemon Thread [http-8080-197] (Suspended)
BufferedInputStream.read(byte[], int, int) line: 308
TSocket(TIOStreamTransport).read(byte[], int, int) line: 126
TSocket(TTransport).readAll(byte[], int, int) line: 84
TBinaryProtocol.readAll(byte[], int, int) line: 314
TBinaryProtocol.readI32() line: 262
TBinaryProtocol.readMessageBegin() line: 192
DumboPayment$Client.recv_pvCheck() line: 120
DumboPayment$Client.pvCheck(int) line: 105
Receiver.performTask(HttpServletRequest, HttpServletResponse) line: 157
Receiver.doGet(HttpServletRequest, HttpServletResponse) line: 109
Receiver(HttpServlet).service(HttpServletRequest, HttpServletResponse) line: 617
Receiver(HttpServlet).service(ServletRequest, ServletResponse) line: 717
ApplicationFilterChain.internalDoFilter(ServletRequest, ServletResponse) line: 290
ApplicationFilterChain.doFilter(ServletRequest, ServletResponse) line: 206
StandardWrapperValve.invoke(Request, Response) line: 233
StandardContextValve.invoke(Request, Response) line: 191
StandardHostValve.invoke(Request, Response) line: 127
ErrorReportValve.invoke(Request, Response) line: 102
StandardEngineValve.invoke(Request, Response) line: 109
CoyoteAdapter.service(Request, Response) line: 298
Http11AprProcessor.process(long) line: 859
Http11AprProtocol$Http11ConnectionHandler.process(long) line: 579
AprEndpoint$Worker.run() line: 1555
Thread.run() line: 619
这似乎是由数据损坏触发的,因为我收到以下异常:
10/11/22 18:38:55 WARN logger.Receiver: pvCheck had an exception
org.apache.thrift.TApplicationException: pvCheck failed: unknown result
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:135)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:298)
at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:859)
at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:579)
at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1555)
at java.lang.Thread.run(Thread.java:619)
也许
10/11/22 17:59:46 ERROR [/ninja_ar].[Receiver]: サーブレット Receiver のServlet.service()が例外を投げました
java.lang.OutOfMemoryError: Java heap space
at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:290)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:198)
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:120)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:269)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:188)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:210)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:172)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:117)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:108)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:151)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:870)
at org.apache.coyote.http11.Http11BaseProtocol$Http11ConnectionHandler.processConnection(Http11BaseProtocol.java:665)
at org.apache.tomcat.util.net.PoolTcpEndpoint.processSocket(PoolTcpEndpoint.java:528)
at org.apache.tomcat.util.net.LeaderFollowerWorkerThread.runIt(LeaderFollowerWorkerThread.java:81)
at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:685)
at java.lang.Thread.run(Thread.java:636)
我偏离了目标,但我很确定这些是相关的当没有发送任何内容时,客户端继续尝试读取。
一些实现细节
服务器和客户端都使用 java 二进制协议。
我编写了一个简单的客户端池类,它可以让我重用客户端,这些是主要功能:
public synchronized Client getClient() {
if(clientQueue.isEmpty()) {
return newClient();
} else {
return clientQueue.getLast();
}
}
private synchronized Client newClient() {
int leftToTry = serverArr.length;
Client cli = null;
while(leftToTry > 0 && cli == null) {
log.info("Creating new connection to " +
serverArr[roundRobinPos] + port);
TTransport transport = new TSocket(serverArr[roundRobinPos], port);
TProtocol protocol = new TBinaryProtocol(transport);
cli = new Client(protocol);
try {
transport.open();
} catch (TTransportException e) {
cli = null;
log.warn("Failed connection to " +
serverArr[roundRobinPos] + port);
}
roundRobinPos++;
if(roundRobinPos >= serverArr.length) {
roundRobinPos = 0;
}
leftToTry--;
}
return cli;
}
public void returnClient(Client cli) {
clientQueue.addFirst(cli);
}
客户端应用程序(即 tomcat servlet)通过以下方式访问它:
Client dpayClient = null;
if(dpay != null
&& (dpayClient = dpay.getClient()) != null) {
try {
dpayClient.pvCheck(requestParameters.getId());
} catch (DPNoToolException e) {
return;
} catch (TException e) {
log.warn("pvCheck had an exception", e);
} finally {
if(dpayClient != null) {
dpay.returnClient(dpayClient);
}
}
}
实际的 thrift 连接按以下方式上升
private boolean initThrift(int port, Configuration conf) {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
DPaymentHandler handler = new DPaymentHandler(conf);
DumboPayment.Processor processor =
new DumboPayment.Processor(handler);
InetAddress listenAddress;
try {
listenAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TServerTransport serverTransport;
try {
serverTransport = new TServerSocket(
new InetSocketAddress(listenAddress, port));
} catch (TTransportException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TTransportFactory transportFactory = new TTransportFactory();
TServer server = new TThreadPoolServer(processor, serverTransport,
transportFactory, protocolFactory);
LOG.info("Starting Dumbo Payment thrift server on " +
listenAddress + ":" + Integer.toString(port));
server.serve();
return true;
}
最后
一直停留在这个问题上有一段时间了......很可能我错过了一些明显的东西。我真的很感激任何帮助。
如果需要任何其他信息,请告诉我。那里有一大堆内容,所以我想尝试让内容保持最(希望)最相关的内容。
edit
Apparently what I was hoping to do is outside of the scope of thrift... If I make sure there is never more than one client on the port, everything is a-ok. Of course this kind of defeats the purpose as I'd like to have several reusable connections open to the server to improve response times and lower overhead.
If anyone has a suggestion of an alternate way to achieve this, it would be appreciated(or if my conclusion is wrong)
Background
I have a multi-component application that is mostly connected up by thrift(mostly java->php connections).
So far it all has appeared to be fine, but having introduced a Java->Java connection where the client end is a servlet that can launch hundreds of requests a second.
The method being accessed has the following interface:
bool pvCheck(1:i32 toolId) throws(1:DPNoToolException nte),
To make sure it wasn't something weird on the service end I replaced the implementation with a trivial one:
@Override
public boolean pvCheck(int toolId) throws TException {
//boolean ret = api.getViewsAndDec(toolId);
return true;
}
The errors/possible causes?
So long as there's not many connections it goes by fine but as soon as connections come close together, connections start getting stuck in the reader.
If I pull one of them up in the debugger the stack looks like this:
Daemon Thread [http-8080-197] (Suspended)
BufferedInputStream.read(byte[], int, int) line: 308
TSocket(TIOStreamTransport).read(byte[], int, int) line: 126
TSocket(TTransport).readAll(byte[], int, int) line: 84
TBinaryProtocol.readAll(byte[], int, int) line: 314
TBinaryProtocol.readI32() line: 262
TBinaryProtocol.readMessageBegin() line: 192
DumboPayment$Client.recv_pvCheck() line: 120
DumboPayment$Client.pvCheck(int) line: 105
Receiver.performTask(HttpServletRequest, HttpServletResponse) line: 157
Receiver.doGet(HttpServletRequest, HttpServletResponse) line: 109
Receiver(HttpServlet).service(HttpServletRequest, HttpServletResponse) line: 617
Receiver(HttpServlet).service(ServletRequest, ServletResponse) line: 717
ApplicationFilterChain.internalDoFilter(ServletRequest, ServletResponse) line: 290
ApplicationFilterChain.doFilter(ServletRequest, ServletResponse) line: 206
StandardWrapperValve.invoke(Request, Response) line: 233
StandardContextValve.invoke(Request, Response) line: 191
StandardHostValve.invoke(Request, Response) line: 127
ErrorReportValve.invoke(Request, Response) line: 102
StandardEngineValve.invoke(Request, Response) line: 109
CoyoteAdapter.service(Request, Response) line: 298
Http11AprProcessor.process(long) line: 859
Http11AprProtocol$Http11ConnectionHandler.process(long) line: 579
AprEndpoint$Worker.run() line: 1555
Thread.run() line: 619
This seems to be triggered by data getting corrupted as I get the following exceptions:
10/11/22 18:38:55 WARN logger.Receiver: pvCheck had an exception
org.apache.thrift.TApplicationException: pvCheck failed: unknown result
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:135)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:298)
at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:859)
at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:579)
at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1555)
at java.lang.Thread.run(Thread.java:619)
and
10/11/22 17:59:46 ERROR [/ninja_ar].[Receiver]: サーブレット Receiver のServlet.service()が例外を投げました
java.lang.OutOfMemoryError: Java heap space
at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:290)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:198)
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:120)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:269)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:188)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:210)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:172)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:117)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:108)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:151)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:870)
at org.apache.coyote.http11.Http11BaseProtocol$Http11ConnectionHandler.processConnection(Http11BaseProtocol.java:665)
at org.apache.tomcat.util.net.PoolTcpEndpoint.processSocket(PoolTcpEndpoint.java:528)
at org.apache.tomcat.util.net.LeaderFollowerWorkerThread.runIt(LeaderFollowerWorkerThread.java:81)
at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:685)
at java.lang.Thread.run(Thread.java:636)
Perhaps I'm way off the mark but I'm pretty sure these are related to the client continuing to attempt to read when there's nothing being sent.
Some implementation details
Both the server and client are using the java binary protocol.
I wrote a simple client pool class which lets me reuse clients, these are the main functions:
public synchronized Client getClient() {
if(clientQueue.isEmpty()) {
return newClient();
} else {
return clientQueue.getLast();
}
}
private synchronized Client newClient() {
int leftToTry = serverArr.length;
Client cli = null;
while(leftToTry > 0 && cli == null) {
log.info("Creating new connection to " +
serverArr[roundRobinPos] + port);
TTransport transport = new TSocket(serverArr[roundRobinPos], port);
TProtocol protocol = new TBinaryProtocol(transport);
cli = new Client(protocol);
try {
transport.open();
} catch (TTransportException e) {
cli = null;
log.warn("Failed connection to " +
serverArr[roundRobinPos] + port);
}
roundRobinPos++;
if(roundRobinPos >= serverArr.length) {
roundRobinPos = 0;
}
leftToTry--;
}
return cli;
}
public void returnClient(Client cli) {
clientQueue.addFirst(cli);
}
The client applications(namely tomcat servlets) access it in the following way:
Client dpayClient = null;
if(dpay != null
&& (dpayClient = dpay.getClient()) != null) {
try {
dpayClient.pvCheck(requestParameters.getId());
} catch (DPNoToolException e) {
return;
} catch (TException e) {
log.warn("pvCheck had an exception", e);
} finally {
if(dpayClient != null) {
dpay.returnClient(dpayClient);
}
}
}
The actual thrift connection is upped in the following manner
private boolean initThrift(int port, Configuration conf) {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
DPaymentHandler handler = new DPaymentHandler(conf);
DumboPayment.Processor processor =
new DumboPayment.Processor(handler);
InetAddress listenAddress;
try {
listenAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TServerTransport serverTransport;
try {
serverTransport = new TServerSocket(
new InetSocketAddress(listenAddress, port));
} catch (TTransportException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TTransportFactory transportFactory = new TTransportFactory();
TServer server = new TThreadPoolServer(processor, serverTransport,
transportFactory, protocolFactory);
LOG.info("Starting Dumbo Payment thrift server on " +
listenAddress + ":" + Integer.toString(port));
server.serve();
return true;
}
Finally
Been stuck on this for a while now... It may well be I'm missing something obvious. I'd really appreciate any help with this.
If any additional information is needed, please let me know. There's a whole mouthful there, so I wanted to try to keep stuff to the most (hopefully) relevant.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
我的猜测是,您有多个线程尝试同时使用客户端,并且我不完全确定这是万无一失的。您可能会尝试使用异步接口并构建线程安全资源池来访问客户端。
使用 Thrift-0.5.0.0,这里是为 thrift 生成的代码创建 AsyncClient 的示例:
但是,如果您查看源代码,您会注意到它有一个单线程消息处理程序,即使它使用 NIO 套接字,您可能会发现这是一个瓶颈。为了获得更多,您必须创建更多异步客户端,检查它们并正确返回它们。
为了简化这一点,我创建了一个快速的小类来管理它们。要修改它以满足您的需求,您唯一需要做的就是包含您的软件包,它应该适合您,即使我还没有真正测试过它(根本没有,真的):
如何使用它的示例:
您可能想要尝试不同的服务器模式(例如 THsHaServer),看看哪种模式最适合您的环境。
My guess is that you have multiple threads trying to use the clients at the same time, and I'm not entirely sure that bulletproof. You might try to use the async interface as well as construct a threadsafe resource pool for accessing your clients.
Using Thrift-0.5.0.0, here is an example of creating an AsyncClient for your thrift generated code:
However, if you look through the source, you'll notice that it's got a single thread message handler, even though it uses a NIO socket, you might find this to be a bottleneck. To get more, you'll have to make more async clients, check them out, and return them correctly.
To simplify this, I've made a quick little class to manage them. The only thing you need to do to modify it to fit your needs is to include your packages and it should work for you, even though I have not really tested it much (at all, really):
An example of how to use it:
You might want to experiment with different server modes like the THsHaServer to see what works best for your environment.
尝试在以下行中使用 THttpClient 而不是 TSocket:
Try using THttpClient rather TSocket in the following line:
ClientPool 的 returnClient 函数不是线程安全的:
the function returnClient of the ClientPool is NOT thread safe: