如何使用 Apache Flink 读取 websocket 数据
我正在尝试使用 Apache Flink 读取 websocket 中的数据
我的 Flink 作业正在连接到 websocket,但它没有从 websocket 中提取数据。
下面是我尝试使用 Apache flink API 连接到 websocket 的示例代码,
RichSourceFunction
中的 run() method
既不执行也不抛出任何错误。
@Slf4j
public class Main {
public static final int CHECKPOINTING_INTERVAL_MS = 5000;
private static final String JOB_NAME = "Flink Streaming Java API Skeleton";
/**
* Main Flink job.
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ObjectMapper objectMapper = new ObjectMapper();
env.setParallelism(4);
ParameterTool paramTool = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(paramTool);
DataStreamSource<String> mySocketStream = env.addSource(new MyWebSocketSourceFunc());
mySocketStream.map(new MapIt()).print();
// mySocketStream.print();
env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);
env.setRestartStrategy(RestartStrategies.noRestart());
env.execute(JOB_NAME);
}
public static class MyWebSocketSourceFunc extends RichSourceFunction<String> {
private boolean running = true;
transient AsyncHttpClient client;
transient BoundRequestBuilder boundRequestBuilder;
transient WebSocketUpgradeHandler.Builder webSocketListener;
private BlockingQueue<String> messages = new ArrayBlockingQueue<>(100);
@Override
public void run(SourceContext<String> ctx) throws Exception {
WebSocketUpgradeHandler webSocketUpgradeHandler = webSocketListener.addWebSocketListener(
new WebSocketListener() {
private final ObjectMapper myMapper = new ObjectMapper();
private String getRsvpId(String payload) {
try {
Map map = myMapper.readValue(payload, Map.class);
Object rsvpId = map.get("rsvp_id");
return rsvpId != null ? rsvpId.toString() : "NOT FOUND";
} catch (IOException e) {
log.error("Mapping failed, returning 'null'");
return "NULL";
}
}
@Override
public void onOpen(WebSocket webSocket) {
}
@Override
public void onClose(WebSocket webSocket, int i, String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
log.debug("onTextFrame({}), rsvp_id={}", hash(payload), getRsvpId(payload));
if (payload != null) {
try {
messages.put(payload);
} catch (InterruptedException e) {
log.error("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
}
}).build();
boundRequestBuilder.execute(webSocketUpgradeHandler).get();
while (running) {
ctx.collect(messages.take());
}
running = false;
}
@Override
public void cancel() {
log.info("cancel function called");
running = false;
}
@Override
public void open(Configuration parameters) throws Exception {
log.info("open function called");
super.open(parameters);
client = Dsl.asyncHttpClient();
boundRequestBuilder = client.prepareGet("ws://stream.meetup.com/2/rsvps");
webSocketListener = new WebSocketUpgradeHandler.Builder();
}
private String hash(String input) {
if (input == null) {
return "-- NULL --";
}
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(input.getBytes());
byte[] digest = md.digest();
return DatatypeConverter.printHexBinary(digest).toUpperCase();
} catch (NoSuchAlgorithmException e) {
log.error("Cound not instantiate MD5", e);
return "--NOT CALCULATED--";
}
}
}
public static class MapIt extends RichMapFunction<String, String> {
final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String map(String value) throws Exception {
Map<String, Object> mapped = objectMapper.readValue(value, Map.class);
Object rsvp = mapped.get("rsvp_id");
return rsvp != null ? rsvp.toString() : "null" ;
}
}
}
这是我执行这项工作所遵循的参考文档 参考
TYIA。
I am trying to read data from websocket
using Apache Flink
My Flink job is connecting to websocket but it is not pulling data from websocket.
below is the sample code that I have tried to connect to websocket using Apache flink API
the run() method
in the RichSourceFunction
neither executing nor throwing any error.
@Slf4j
public class Main {
public static final int CHECKPOINTING_INTERVAL_MS = 5000;
private static final String JOB_NAME = "Flink Streaming Java API Skeleton";
/**
* Main Flink job.
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ObjectMapper objectMapper = new ObjectMapper();
env.setParallelism(4);
ParameterTool paramTool = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(paramTool);
DataStreamSource<String> mySocketStream = env.addSource(new MyWebSocketSourceFunc());
mySocketStream.map(new MapIt()).print();
// mySocketStream.print();
env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);
env.setRestartStrategy(RestartStrategies.noRestart());
env.execute(JOB_NAME);
}
public static class MyWebSocketSourceFunc extends RichSourceFunction<String> {
private boolean running = true;
transient AsyncHttpClient client;
transient BoundRequestBuilder boundRequestBuilder;
transient WebSocketUpgradeHandler.Builder webSocketListener;
private BlockingQueue<String> messages = new ArrayBlockingQueue<>(100);
@Override
public void run(SourceContext<String> ctx) throws Exception {
WebSocketUpgradeHandler webSocketUpgradeHandler = webSocketListener.addWebSocketListener(
new WebSocketListener() {
private final ObjectMapper myMapper = new ObjectMapper();
private String getRsvpId(String payload) {
try {
Map map = myMapper.readValue(payload, Map.class);
Object rsvpId = map.get("rsvp_id");
return rsvpId != null ? rsvpId.toString() : "NOT FOUND";
} catch (IOException e) {
log.error("Mapping failed, returning 'null'");
return "NULL";
}
}
@Override
public void onOpen(WebSocket webSocket) {
}
@Override
public void onClose(WebSocket webSocket, int i, String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
log.debug("onTextFrame({}), rsvp_id={}", hash(payload), getRsvpId(payload));
if (payload != null) {
try {
messages.put(payload);
} catch (InterruptedException e) {
log.error("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
}
}).build();
boundRequestBuilder.execute(webSocketUpgradeHandler).get();
while (running) {
ctx.collect(messages.take());
}
running = false;
}
@Override
public void cancel() {
log.info("cancel function called");
running = false;
}
@Override
public void open(Configuration parameters) throws Exception {
log.info("open function called");
super.open(parameters);
client = Dsl.asyncHttpClient();
boundRequestBuilder = client.prepareGet("ws://stream.meetup.com/2/rsvps");
webSocketListener = new WebSocketUpgradeHandler.Builder();
}
private String hash(String input) {
if (input == null) {
return "-- NULL --";
}
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(input.getBytes());
byte[] digest = md.digest();
return DatatypeConverter.printHexBinary(digest).toUpperCase();
} catch (NoSuchAlgorithmException e) {
log.error("Cound not instantiate MD5", e);
return "--NOT CALCULATED--";
}
}
}
public static class MapIt extends RichMapFunction<String, String> {
final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String map(String value) throws Exception {
Map<String, Object> mapped = objectMapper.readValue(value, Map.class);
Object rsvp = mapped.get("rsvp_id");
return rsvp != null ? rsvp.toString() : "null" ;
}
}
}
Here is the reference document that I have followed to implement this job
Referance
TYIA.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
Flink 包含一个内置的套接字源连接器。您会发现 一个示例显示如何在文档中使用它。这将比调试其他实现更容易。
另请注意,不建议在生产应用程序中使用套接字,因为它们无法提供任何容错保证(因为它们无法支持检查点)。
Flink includes a built-in socket source connector. You'll find an example showing how to use it in the documentation. That's going to be easier than debugging this other implementation.
Also be aware that using sockets in production applications is not recommended, since they are unable to provide any fault tolerance guarantees (because they cannot support checkpointing).