Cassandra Hector 加载器应用程序内存不足
这个简单的应用程序采用带有标头的逗号 delim 文件并将其放入 Cassandra 中。 它适用于小文件,但是内存只会增加,直到内存不足异常杀死它。
我缺少什么?
package com.company;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
public class QuickLoad {
public static Keyspace keyspace = null;
public static void main(String[] args) {
File file = new File(args[0]);
String keyspaceName = args[1];
String columnFamilyName = args[2];
BufferedReader reader = null;
try {
keyspace = GetKeyspace(keyspaceName);
reader = new BufferedReader(new FileReader(file));
String fileLine = null;
String[] headers = null;
String[] fields = null;
boolean headerLine = true;
while ((fileLine = reader.readLine()) != null) {
if (headerLine){
headerLine = false;
headers = fileLine.substring(1, fileLine.length()-1).split("\",\"");
} else {
fields = fileLine.substring(1, fileLine.length()-1).split("\",\"");
CassandraSave(keyspace, columnFamilyName, headers, fields);
}
}
}
catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.exit(0);
}
public static void CassandraSave(Keyspace keyspace, String columnFamily, String[] headers, String[] columns)
{
try
{
Mutator mutator = HFactory.createMutator(keyspace, StringSerializer.get());
for (int i = 1; i < headers.length-1; i++)
{
if ((columns[i] != null) || (!columns[i].equals("null"))) {
if (columns[i].length() > 0) {
HColumn<String, String> col = HFactory.createStringColumn(headers[i], columns[i]);
mutator.insert(columns[1], columnFamily, col);
}
}
}
mutator.execute();
} catch (Exception e){
e.printStackTrace();
}
}
public static Keyspace GetKeyspace(String keyspaceName)
{
String serverAddress = "localhost:9160";
Cluster cluster = HFactory.getOrCreateCluster("My Cluster", serverAddress);
Keyspace keyspace = HFactory.createKeyspace(keyspaceName, cluster);
return keyspace;
}
}
This simple app takes a comma delim file with headers and puts into Cassandra.
It works for small file, however the memory just goes up until out of memory exception kills it.
What am I missing?
package com.company;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
public class QuickLoad {
public static Keyspace keyspace = null;
public static void main(String[] args) {
File file = new File(args[0]);
String keyspaceName = args[1];
String columnFamilyName = args[2];
BufferedReader reader = null;
try {
keyspace = GetKeyspace(keyspaceName);
reader = new BufferedReader(new FileReader(file));
String fileLine = null;
String[] headers = null;
String[] fields = null;
boolean headerLine = true;
while ((fileLine = reader.readLine()) != null) {
if (headerLine){
headerLine = false;
headers = fileLine.substring(1, fileLine.length()-1).split("\",\"");
} else {
fields = fileLine.substring(1, fileLine.length()-1).split("\",\"");
CassandraSave(keyspace, columnFamilyName, headers, fields);
}
}
}
catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.exit(0);
}
public static void CassandraSave(Keyspace keyspace, String columnFamily, String[] headers, String[] columns)
{
try
{
Mutator mutator = HFactory.createMutator(keyspace, StringSerializer.get());
for (int i = 1; i < headers.length-1; i++)
{
if ((columns[i] != null) || (!columns[i].equals("null"))) {
if (columns[i].length() > 0) {
HColumn<String, String> col = HFactory.createStringColumn(headers[i], columns[i]);
mutator.insert(columns[1], columnFamily, col);
}
}
}
mutator.execute();
} catch (Exception e){
e.printStackTrace();
}
}
public static Keyspace GetKeyspace(String keyspaceName)
{
String serverAddress = "localhost:9160";
Cluster cluster = HFactory.getOrCreateCluster("My Cluster", serverAddress);
Keyspace keyspace = HFactory.createKeyspace(keyspaceName, cluster);
return keyspace;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
如果输入文件中的“列”之一大于分配的堆,我可能会认为这是一个问题。您可以通过设置突变大小的上限来解决此问题,因为您的 CassandraSave 函数在一次操作中仅执行 100 个左右的突变。
I could see this as a problem if one of your 'columns' in your input file was larger than your allocated heap. You may be able to fix this by putting an upper bound on the size of your mutation, s.t. your CassandraSave function only does 100 or so mutations in a single operation.
看起来您正在使用旧版本的 hector 并遇到 speed4j 内存泄漏的错误。如果您升级到 hector 0.8.0-2,它应该得到修复。
需要注意的一件事是,speed4j 在 0.8.0-2 中默认处于禁用状态,如果您想启用它,请参见 此线程。
Looks like you are using an older version of hector and running into a bug with speed4j leaking memory. If you upgrade to hector 0.8.0-2 it should be fixed.
One thing to note is that speed4j is disabled by default in 0.8.0-2, if you want to enable it see this thread.
我看到两件事——它是单线程的,而且批量大小非常小。
添加外部循环以批量收集突变器中的插入内容
从大约 500 行的大小开始,看看效果如何。
这是我用于压力测试的高性能突变插入的示例:
https:// /github.com/zznate/cassandra-stress/blob/master/src/main/java/com/riptano/cassandra/stress/InsertCommand.java
另外,它有点旧,但这里是一种方法的要点
并行加载器的工作原理与您所描述的类似:
https://gist.github.com/397574
Two things I see - it's single threaded and the batch size is pretty small.
Add an outside loop to collect inserts in the mutator with a batch
size of about 500 rows to start and see how that goes.
Here is an example of performant mutator insert I use for stress testing:
https://github.com/zznate/cassandra-stress/blob/master/src/main/java/com/riptano/cassandra/stress/InsertCommand.java
Also, it is a bit older, but here is a gist of an approach to a
parallelized loader that works similarly to what you describe:
https://gist.github.com/397574