MySQL 插入序列优化
我有一个实时应用程序,可以处理信息并将其记录到 MySQL 数据库(实际上是 MariaDB,MySQL 的一个分支)。它每天执行大约 150 万次插入 + 150,000 次删除。
我在性能方面遇到了很大的问题,并且不知道如何让它发挥更好的作用。
应用程序的基本结构是我有一个生产者类,它将 Struct 推送到线程安全双端队列。以下代码
#include "dbUserQueue.h"
dbUserQueue::~dbUserQueue() {
}
void dbUserQueue::createConnection()
{
sql::Driver * driver = sql::mysql::get_driver_instance();
std::auto_ptr< sql::Connection > newCon(driver->connect(dbURL, dbUser, dbPass));
con = newCon;
std::auto_ptr< sql::Statement > stmt(con->createStatement());
stmt->execute("USE twitter");
}
inline void dbUserQueue::updateStatement(const std::string & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != "\0") stmt->setString(index, value);
else stmt->setNull(index,sql::DataType::VARCHAR);
}
inline void dbUserQueue::updateStatement(const boost::int64_t & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != -1) stmt->setInt64(index,value);
else stmt->setNull(index,sql::DataType::BIGINT);
}
inline void dbUserQueue::updateStatement(const bool value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
stmt->setBoolean(index, value);
}
inline void dbUserQueue::updateStatement(const int value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != -1) stmt->setInt(index,value);
else stmt->setNull(index,sql::DataType::INTEGER);
}
inline void dbUserQueue::updateStatementDateTime(const std::string & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int & index)
{
if(value != "\0") stmt->setDateTime(index, value);
else stmt->setNull(index,sql::DataType::DATE);
}
/*
* This method creates a database connection
* and then creates a new thread to process the incoming queue
*/
void dbUserQueue::start() {
createConnection();
if(con->isClosed() == false)
{
insertStmt = std::auto_ptr< sql::PreparedStatement>(con->prepareStatement("\
insert ignore into users(contributors_enabled, created_at, \
description, favourites_count, followers_count, \
following, friends_count, geo_enabled, id, lang, listed_count, location, \
name, notifications, screen_name, show_all_inline_media, statuses_count, \
url, utc_offset, verified) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
}
thread = boost::thread(&dbUserQueue::processLoop, this);
}
/*
* Stops the thread once it is finished processing the information
*/
void dbUserQueue::join(){
thread.interrupt();
thread.join();
}
/*
* The worker function of the thread.
* Pops items from the queue and updates the database accordingly.
*/
void dbUserQueue::processLoop() {
user input;
int recordCount = 0;
con->setAutoCommit(false);
while (true) {
try {
if(recordCount >= 1000)
{
recordCount = 0;
con->commit();
}
// Insert all the data into the prepared statement
if (userQ.wait_and_pop(input)) {
updateStatement(input.contributors_enabled, insertStmt, 1);
updateStatementDateTime(input.created_at, insertStmt, 2);
updateStatement(input.description, insertStmt, 3);
updateStatement(input.favourites_count, insertStmt, 4);
updateStatement(input.followers_count, insertStmt, 5);
updateStatement(input.following, insertStmt, 6);
updateStatement(input.friends_count, insertStmt, 7);
updateStatement(input.geo_enabled, insertStmt, 8);
updateStatement(input.id, insertStmt, 9);
updateStatement(input.lang, insertStmt, 10);
updateStatement(input.listed_count, insertStmt, 11);
updateStatement(input.location, insertStmt, 12);
updateStatement(input.name, insertStmt, 13);
updateStatement(input.notifications, insertStmt, 14);
updateStatement(input.screenName, insertStmt, 15);
updateStatement(input.show_all_inline_media, insertStmt, 16);
updateStatement(input.statuses_count, insertStmt, 17);
updateStatement(input.url, insertStmt, 18);
updateStatement(input.utc_offset, insertStmt, 19);
updateStatement(input.verified, insertStmt, 20);
insertStmt->executeUpdate();
insertStmt->clearParameters();
recordCount++;
continue;
}
} catch (std::exception & e) {
}
}// end of while
// Close the statements and the connection before exiting
insertStmt->close();
con->commit();
if(con->isClosed() == false)
con->close();
}
我的问题是如何提高性能?我尝试过的事情:
让多个使用者连接到一个 MySQL/MariaDB
在大量记录后提交
Single Producer, Single consumer, commit after 1000 records = ~275 Seconds
Dual Producer, Triple consumers, commit after 1000 records = ~100 Seconds
Dual Producer, Triple consumers, commit after 2000 records = ~100 Seconds
Dual Producer, Triple consumers, commit every 1 record = ~100 Seconds
Dual Producer, 6 Consumers, commit every 1 record = ~95 Seconds
Dual Producer, 6 Consumers, commit every 2000 records = ~100 Seconds
Triple Producer, 6 Consumesr, commit every 2000 records = ~100 Seconds
关于问题域的一些注释。要插入和/或删除的消息全天随机出现,平均每秒约 20 次插入/删除,突发量要高得多,但没有理由更新不能在短时间内排队,只要队列不会变大。
当前插入数据的表大约有 5200 万条记录。这是MySQL表信息
CREATE TABLE `users` (
`id` bigint(20) unsigned NOT NULL,
`contributors_enabled` tinyint(4) DEFAULT '0',
`created_at` datetime NOT NULL,
`description` varchar(255) DEFAULT NULL,
`favourites_count` int(11) NOT NULL,
`followers_count` int(11) DEFAULT NULL,
`following` varchar(255) DEFAULT NULL,
`friends_count` int(11) NOT NULL,
`geo_enabled` tinyint(4) DEFAULT '0',
`lang` varchar(255) DEFAULT NULL,
`listed_count` int(11) DEFAULT NULL,
`location` varchar(255) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`notifications` varchar(45) DEFAULT NULL,
`screen_name` varchar(45) NOT NULL,
`show_all_inline_media` tinyint(4) DEFAULT NULL,
`statuses_count` int(11) NOT NULL,
`url` varchar(255) DEFAULT NULL,
`utc_offset` int(11) DEFAULT NULL,
`verified` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=MARIA DEFAULT CHARSET=latin1 CHECKSUM=1 PAGE_CHECKSUM=1 TRANSACTIONAL=1
I have a realtime application that processes information and log's it to a MySQL database (actually MariaDB, a fork of MySQL). It does anywhere around 1.5 million inserts a day + 150,000 deletes.
I am having great problems with performance and don't know how to make it function any better.
The basic structure of the application is that I have a producer class, that pushes a Struct to a threadsafe deque. The following code
#include "dbUserQueue.h"
dbUserQueue::~dbUserQueue() {
}
void dbUserQueue::createConnection()
{
sql::Driver * driver = sql::mysql::get_driver_instance();
std::auto_ptr< sql::Connection > newCon(driver->connect(dbURL, dbUser, dbPass));
con = newCon;
std::auto_ptr< sql::Statement > stmt(con->createStatement());
stmt->execute("USE twitter");
}
inline void dbUserQueue::updateStatement(const std::string & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != "\0") stmt->setString(index, value);
else stmt->setNull(index,sql::DataType::VARCHAR);
}
inline void dbUserQueue::updateStatement(const boost::int64_t & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != -1) stmt->setInt64(index,value);
else stmt->setNull(index,sql::DataType::BIGINT);
}
inline void dbUserQueue::updateStatement(const bool value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
stmt->setBoolean(index, value);
}
inline void dbUserQueue::updateStatement(const int value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != -1) stmt->setInt(index,value);
else stmt->setNull(index,sql::DataType::INTEGER);
}
inline void dbUserQueue::updateStatementDateTime(const std::string & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int & index)
{
if(value != "\0") stmt->setDateTime(index, value);
else stmt->setNull(index,sql::DataType::DATE);
}
/*
* This method creates a database connection
* and then creates a new thread to process the incoming queue
*/
void dbUserQueue::start() {
createConnection();
if(con->isClosed() == false)
{
insertStmt = std::auto_ptr< sql::PreparedStatement>(con->prepareStatement("\
insert ignore into users(contributors_enabled, created_at, \
description, favourites_count, followers_count, \
following, friends_count, geo_enabled, id, lang, listed_count, location, \
name, notifications, screen_name, show_all_inline_media, statuses_count, \
url, utc_offset, verified) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
}
thread = boost::thread(&dbUserQueue::processLoop, this);
}
/*
* Stops the thread once it is finished processing the information
*/
void dbUserQueue::join(){
thread.interrupt();
thread.join();
}
/*
* The worker function of the thread.
* Pops items from the queue and updates the database accordingly.
*/
void dbUserQueue::processLoop() {
user input;
int recordCount = 0;
con->setAutoCommit(false);
while (true) {
try {
if(recordCount >= 1000)
{
recordCount = 0;
con->commit();
}
// Insert all the data into the prepared statement
if (userQ.wait_and_pop(input)) {
updateStatement(input.contributors_enabled, insertStmt, 1);
updateStatementDateTime(input.created_at, insertStmt, 2);
updateStatement(input.description, insertStmt, 3);
updateStatement(input.favourites_count, insertStmt, 4);
updateStatement(input.followers_count, insertStmt, 5);
updateStatement(input.following, insertStmt, 6);
updateStatement(input.friends_count, insertStmt, 7);
updateStatement(input.geo_enabled, insertStmt, 8);
updateStatement(input.id, insertStmt, 9);
updateStatement(input.lang, insertStmt, 10);
updateStatement(input.listed_count, insertStmt, 11);
updateStatement(input.location, insertStmt, 12);
updateStatement(input.name, insertStmt, 13);
updateStatement(input.notifications, insertStmt, 14);
updateStatement(input.screenName, insertStmt, 15);
updateStatement(input.show_all_inline_media, insertStmt, 16);
updateStatement(input.statuses_count, insertStmt, 17);
updateStatement(input.url, insertStmt, 18);
updateStatement(input.utc_offset, insertStmt, 19);
updateStatement(input.verified, insertStmt, 20);
insertStmt->executeUpdate();
insertStmt->clearParameters();
recordCount++;
continue;
}
} catch (std::exception & e) {
}
}// end of while
// Close the statements and the connection before exiting
insertStmt->close();
con->commit();
if(con->isClosed() == false)
con->close();
}
My questions is on how to improve the performance? Things I have tried:
Having multiple consumers connecting to one MySQL/MariaDB
Committing after a large number of records
Single Producer, Single consumer, commit after 1000 records = ~275 Seconds
Dual Producer, Triple consumers, commit after 1000 records = ~100 Seconds
Dual Producer, Triple consumers, commit after 2000 records = ~100 Seconds
Dual Producer, Triple consumers, commit every 1 record = ~100 Seconds
Dual Producer, 6 Consumers, commit every 1 record = ~95 Seconds
Dual Producer, 6 Consumers, commit every 2000 records = ~100 Seconds
Triple Producer, 6 Consumesr, commit every 2000 records = ~100 Seconds
A couple notes on the problem domain. The messages to insert and or delete come randomly throughout the day with an average of ~20 inserts/deletes per second, bursts much higher but there is no reason that the updates can't be queued for a short period, as long as the queue doesn't grow to large.
The table that the data is currently being inserted into has approximately 52 million records in it. Here is the MySQL table information
CREATE TABLE `users` (
`id` bigint(20) unsigned NOT NULL,
`contributors_enabled` tinyint(4) DEFAULT '0',
`created_at` datetime NOT NULL,
`description` varchar(255) DEFAULT NULL,
`favourites_count` int(11) NOT NULL,
`followers_count` int(11) DEFAULT NULL,
`following` varchar(255) DEFAULT NULL,
`friends_count` int(11) NOT NULL,
`geo_enabled` tinyint(4) DEFAULT '0',
`lang` varchar(255) DEFAULT NULL,
`listed_count` int(11) DEFAULT NULL,
`location` varchar(255) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`notifications` varchar(45) DEFAULT NULL,
`screen_name` varchar(45) NOT NULL,
`show_all_inline_media` tinyint(4) DEFAULT NULL,
`statuses_count` int(11) NOT NULL,
`url` varchar(255) DEFAULT NULL,
`utc_offset` int(11) DEFAULT NULL,
`verified` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=MARIA DEFAULT CHARSET=latin1 CHECKSUM=1 PAGE_CHECKSUM=1 TRANSACTIONAL=1
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以更改代码以进行批量插入,而不是一次插入一行。
You could change the code to do bulk inserts, rather than insert one row at a time.