将 MySQL 表转换为 Cassandra 中的 ColumnFamily:使用 Hector 进行缓慢批量突变
我有一个非常大的 MySQL 表(数十亿行,有几十列),我想将其转换为 Cassandra 中的 ColumnFamily。我正在使用赫克托。
我首先这样创建我的模式:
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String newKeyspaceName = "KeyspaceName";
String newColumnFamilyName = "CFName";
ThriftCluster cassandraCluster;
CassandraHostConfigurator cassandraHostConfigurator;
cassandraHostConfigurator = new CassandraHostConfigurator(host);
cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);
BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
columnFamilyDefinition.setName(newColumnFamilyName);
columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);
BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
columnDefinition.setIndexName("id_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
columnDefinition.setIndexName("status_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
.......
ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);
KeyspaceDefinition keyspaceDefinition =
HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));
cassandraCluster.addKeyspace(keyspaceDefinition);
完成后,我加载存储在列表中的数据,因为我使用namedParametersJdbcTemplate获取MySQL数据,如下所示:
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();
public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {
Keyspace workingKeyspace = null;
Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);
ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);
long t1 = System.currentTimeMillis();
for (SqlParameterSource data : dataToInsert) {
String keyId = "id" + (Integer) data.getValue("id");
mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));
...............
}
mutator.execute();
System.out.println(t1 - System.currentTimeMillis());
我在大约1小时内插入100 000行,这真的很慢。我听说过我的插入多线程,但在这种特殊情况下我不知道该怎么做。我应该使用 BatchMutate 吗?
I have a very large MySQL table (billions of rows, with dozens of columns) I would like to convert into a ColumnFamily in Cassandra. I'm using Hector.
I first create my schema as such :
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String newKeyspaceName = "KeyspaceName";
String newColumnFamilyName = "CFName";
ThriftCluster cassandraCluster;
CassandraHostConfigurator cassandraHostConfigurator;
cassandraHostConfigurator = new CassandraHostConfigurator(host);
cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);
BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
columnFamilyDefinition.setName(newColumnFamilyName);
columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);
BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
columnDefinition.setIndexName("id_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
columnDefinition.setIndexName("status_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
.......
ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);
KeyspaceDefinition keyspaceDefinition =
HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));
cassandraCluster.addKeyspace(keyspaceDefinition);
Once that done, I load my data, stored in a List, since I'm fetching the MySQL data with a namedParametersJdbcTemplate, as such :
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();
public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {
Keyspace workingKeyspace = null;
Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);
ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);
long t1 = System.currentTimeMillis();
for (SqlParameterSource data : dataToInsert) {
String keyId = "id" + (Integer) data.getValue("id");
mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));
...............
}
mutator.execute();
System.out.println(t1 - System.currentTimeMillis());
I'm inserting 100 000 lines in approximatively 1 hour, which is really slow. I heard about multi-threading my inserts, but in this particular case I don't know what to do. Should I use BatchMutate?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
是的,您应该从多个线程运行插入代码。查看以下压力测试代码,了解如何使用 hector 有效地执行此操作的示例:
https://github.com/zznate/cassandra-stress
插入性能问题的另一个来源可能是您在列族上应用的二级索引的数量(每个二级索引“在幕后”创建一个额外的列族)。
正确设计的数据模型不应该真正需要大量的二级索引。以下文章很好地概述了 Cassandra 中的数据建模:
http://www.datastax.com/docs/1.0/ddl/index
Yes, you should run your insertion code from multiple threads. Take a look at the following stress testing code for an example of how to do this efficiently with hector:
https://github.com/zznate/cassandra-stress
An additional source of your insert performance issue may be the number of secondary indexes you are applying on the column family (each secondary index creates an additional column family 'under the hood').
Correctly designed data models should not really need a large number of secondary indexes. The following article provides a good overview of data modeling in Cassandra:
http://www.datastax.com/docs/1.0/ddl/index
有一种替代方法可以实现这一目标。您可以尝试探索 https://github.com/impetus-opensource/Kundera。你会喜欢它的。
Kundera 是一个符合 JPA 2.0 的 NoSQL 数据存储对象数据存储映射库,目前支持 Cassandra、HBase、MongoDB 和所有关系数据存储(Kundera 内部对所有关系数据存储使用 Hibernate)。
在您的情况下,您可以使用现有对象以及 JPA 注释将它们存储在 Cassandra 中。由于 Kundera 支持多语言持久性,您还可以使用 MySQL + Cassandra 组合,其中您可以使用 MySQL 处理大部分数据,使用 Cassandra 处理事务数据。而且由于您需要关心的只是对象和 JPA 注释,因此您的工作会容易得多。
对于性能,您可以查看 https://github.com/impetus-opensource /昆德拉/wiki/昆德拉-表演
There is one alternate way of achieving this. You can try exploring https://github.com/impetus-opensource/Kundera. You would love it.
Kundera is a JPA 2.0 compliant Object-Datastore Mapping Library for NoSQL Datastores and currently supports Cassandra, HBase, MongoDB and all relational datastores (Kundera internally uses Hibernate for all relational datastores).
In your case you can use your existing objects along with JPA annotations to store them in Cassandra. Since Kundera supports polyglot persistence you also use a MySQL + Cassandra combination where you can use MySQL for most of your data and Cassandra for transactional data.And since all you need to care about is objects and JPA annotations, your job would be much easier.
For performance you can have a look at https://github.com/impetus-opensource/Kundera/wiki/Kundera-Performance