启动 Hazelcast 集群的新成员时出现重复条目

发布于 2025-01-12 13:03:30 字数 4164 浏览 2 评论 0原文

我实现了 MapStore 将条目保存到数据库(外部数据存储),但是当旧成员将条目存储到数据库时存在启动新节点的问题。首先,集群 A 只有 Member1

客户端将 20K 条目放入集群,然后 Member1 开始将条目批量保存到数据库(后写模式)。当 5K 条目已保存到数据库时(未完成整个 20K 条目),我启动 Member2 加入集群 A

--> Member2 收到大约 10K 条目(重新分区的结果),Member2 将它们存储到数据库中,尽管 Member1 已将这 10K 条目保存到数据库中,

这导致数据库中重复 10K 记录(10K 来自 Member2)。当开始从客户端输入条目时,或者在直写模式下,当 2 个成员都准备好时,不会发生这种情况

请告诉我为什么?这是我的代码

/**
 *
 * @author Mina Mimi
 */
public class SmsLogMapStore implements MapStore<String, SmsLog>, MapLoaderLifecycleSupport {

    Connection conn;

    static final Logger logger = Logger.getLogger(SmsLogMapStore.class.getSimpleName());

    @Override
    public synchronized void store(String k, SmsLog v) {
        logger.info("===write one >>>:" + v.getContent());
        String sql = "insert into smslog (content) values (?)";
        PreparedStatement st = null;
        try {
            st = conn.prepareStatement(sql);
            st.setString(1, v.getContent());
            st.executeUpdate();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                st.close();
            } catch (SQLException ex) {
                Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        logger.info("Finished write one >>>");
    }
    @Override
    public synchronized void storeAll(Map<String, SmsLog> map) {
        logger.info("Write batch:" + map.size());
        String sql = "insert into smslog (content) values (?)";
        PreparedStatement st = null;
        try {
            st = conn.prepareStatement(sql);
            for (Map.Entry<String, SmsLog> entry : map.entrySet()) {
                String key = entry.getKey();
                SmsLog v = entry.getValue();
                logger.info("===Writing(k,v):" + key + "," + v + " from batch");
                st.setString(1, v.getContent());
                st.addBatch();
            }
            st.executeBatch();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                st.close();
            } catch (SQLException ex) {
                Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        logger.info("Finished Write batch >>>");
    }

    @Override
    public void delete(String k) {
        return;
    }

    @Override
    public void deleteAll(Collection<String> clctn) {
        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
        for (String key : clctn) {
            delete(key);
        }

    }

    @Override
    public SmsLog load(String k) {        
        return new SmsLog();
    }

    @Override
    public Map<String, SmsLog> loadAll(Collection<String> clctn) {
        logger.info("############loadAll");
        HashMap<String, SmsLog> result = new HashMap<String, SmsLog>();
        return result;
    }

    @Override
    public synchronized Iterable<String> loadAllKeys() {
        List<String> keys = new LinkedList<String>();
        return keys;
    }

    @Override
    public void init(HazelcastInstance hi, Properties prprts, String string) {
        try {
            conn = getConnection();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void destroy() {
        try {
            conn.close();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private Connection getConnection() throws SQLException {
        Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test_imdb", "root", "");
        return conn;
    }

}

I implement my MapStore to save entries to database (external datastore), but there a issue of starting new node when old member storing entries to database. Firstly, cluster A has only Member1.

Client puts 20K entries to the cluster then Member1 starts saving entries by batch to database (write-behind mode). When 5K entries has been saved to database (not finish whole 20K entries) I start Member2 to join the cluster A

--> Member2 received about 10K entries (result of repartition) , Member2 stores them to the database, despite of this 10K entries has been saved to database by Member1

This leads to duplicate 10K records in database (10K from Member2). This is not occurs when 2 members are both ready when start putting entries from client, or in write-though mode

Please tell me why? This is my code

/**
 *
 * @author Mina Mimi
 */
public class SmsLogMapStore implements MapStore<String, SmsLog>, MapLoaderLifecycleSupport {

    Connection conn;

    static final Logger logger = Logger.getLogger(SmsLogMapStore.class.getSimpleName());

    @Override
    public synchronized void store(String k, SmsLog v) {
        logger.info("===write one >>>:" + v.getContent());
        String sql = "insert into smslog (content) values (?)";
        PreparedStatement st = null;
        try {
            st = conn.prepareStatement(sql);
            st.setString(1, v.getContent());
            st.executeUpdate();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                st.close();
            } catch (SQLException ex) {
                Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        logger.info("Finished write one >>>");
    }
    @Override
    public synchronized void storeAll(Map<String, SmsLog> map) {
        logger.info("Write batch:" + map.size());
        String sql = "insert into smslog (content) values (?)";
        PreparedStatement st = null;
        try {
            st = conn.prepareStatement(sql);
            for (Map.Entry<String, SmsLog> entry : map.entrySet()) {
                String key = entry.getKey();
                SmsLog v = entry.getValue();
                logger.info("===Writing(k,v):" + key + "," + v + " from batch");
                st.setString(1, v.getContent());
                st.addBatch();
            }
            st.executeBatch();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                st.close();
            } catch (SQLException ex) {
                Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        logger.info("Finished Write batch >>>");
    }

    @Override
    public void delete(String k) {
        return;
    }

    @Override
    public void deleteAll(Collection<String> clctn) {
        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
        for (String key : clctn) {
            delete(key);
        }

    }

    @Override
    public SmsLog load(String k) {        
        return new SmsLog();
    }

    @Override
    public Map<String, SmsLog> loadAll(Collection<String> clctn) {
        logger.info("############loadAll");
        HashMap<String, SmsLog> result = new HashMap<String, SmsLog>();
        return result;
    }

    @Override
    public synchronized Iterable<String> loadAllKeys() {
        List<String> keys = new LinkedList<String>();
        return keys;
    }

    @Override
    public void init(HazelcastInstance hi, Properties prprts, String string) {
        try {
            conn = getConnection();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void destroy() {
        try {
            conn.close();
        } catch (SQLException ex) {
            Logger.getLogger(SmsLogMapStore.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private Connection getConnection() throws SQLException {
        Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test_imdb", "root", "");
        return conn;
    }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

烟织青萝梦 2025-01-19 13:03:31

write-behind 没有分布式协调。意味着添加或删除节点可能会导致数据丢失或重复,这里不能保证,当前的实现正在以尽力而为的方式工作。

如果您要求解决方法,您可以拥有一个分布式 ID 供应商,在将对象放入 IMap 之前,您可以从该供应商为其设置一个 id,当 Map-store 保存对象时,可以通过此帮助检测到重复项ID。

There is no distributed coordination for write-behind. Means adding or removing nodes can cause data loss or duplications, no guarantee here, current implementation is working as a best effort manner.

If you ask for a workaround, you can have a distributed id supplier and before putting objects in IMap, you can set an id to it from this supplier, when map-store is saving the object, duplications can be detected by the help of this id.

你的往事 2025-01-19 13:03:30

由于 storeAll() 不会向 Hazelcast 发出进度信号,因此 Hazelcast 无法从中断位置恢复写入操作。它必须简单地在新的集群成员集上从头开始重新启动它。

因此,storeAll() 实现应该是幂等的。例如,它必须准备好在同一键上进行多次调用。最简单的方法是使用 UPSERT 式操作将数据写入存储。如果记录已被写入,数据库将忽略它(或使用相同的值重写它)。

如果您需要存储大批量数据,请考虑使用数据管道。 Jet 可以在发生故障或拓扑更改后从特定源偏移恢复,而不仅仅是重新启动数据操作。这将是一个具有更多移动部件的解决方案,因此除非确实存在问题,否则不要使用它。 20k 个密钥是一种很小的数据集。

您使用的存储系统是什么?它是否支持 upsert?

Since storeAll() doesn’t signal the progress back to Hazelcast, Hazelcast has no means of restoring the write operation from where it left off. It must simply restart it from the scratch on the new set of cluster members.

As such, the storeAll() implementation should be made idempotent. E.g. it must be ready to survive multiple invocations on the same keys. The simplest approach is to use a UPSERT-style operation to write data to the storage. If the record was already written, the DB will just ignore it (or rewrite it with the same value).

If you need to store big batches of data, consider using the data pipeline. Jet can resume from a specific source offset after a failure or a topology change, the data operation isn't just restarted. It will be a solution with more moving parts, so don’t use it unless it’s really a concern. 20k keys is kind of tiny dataset.

What is the storage system you are using? Does it support upserts by any means?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文