基于MySQL实现Leader 选举

在公司中间件逐步向云服务迁移的过程中, 要逐步下线中间件中依赖的Zookeeper. 而不少服务依赖Zookeeper的leader选举.
因此我们需要提供一些不依赖外部的一些实现, 而因为mysql在我们内部广泛被使用,所以这次实现leader 选举使用了mysql.

表结构

set names utf8mb4;
CREATE TABLE leader_election (
  id int(10) unsigned not null auto_increment comment '主键id',
  name varchar(128) not null default '' comment '名称',
  node varchar(128) NOT NULL default '' comment '节点',
  last_seen_active timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '最后更新时间',
  PRIMARY KEY (id),
  unique key uniq_idx_name(name)
) ENGINE=InnoDB default charset=utf8mb4 comment 'leader选举';

之所以设置一列name主要是考虑到可能会有很多的地方使用到leader选举,所以设置了name这一列.这样每个leader选举服务
就对应leader_election表的一条记录

节点表示

考虑到一个机器上可能会部署多个服务,因此节点的表示需要精确到pid层级
我这边采用的node表示为: hostname:port:pid

核心sql

谁是leader

select node as leader from leader_election where name=?

开始选举

insert ignore into leader_election (name, node, last_seen_active) values (?, ?, now()) on duplicate key update node = if(last_seen_active < DATE_SUB(now(), INTERVAL ? SECOND), values(node), node),
last_seen_active = if(node = values(node), values(last_seen_active), last_seen_active);

这里面的第三个问候表示leader必须续租的时间, 如果leader超过这个时间还没有续租的话, 那么别的节点就可能被选为leader

强制指定某节点为leader

replace into leader_election (name, node, last_seen_active) values (?, ?, now())

强制重新开始选举

delete from leader_election where name=? and node = ?

判断自己是否是leader

select count(*) as is_leader from leader_election where name=? and node=?

核心java实现

private final AtomicBoolean isLeader = new AtomicBoolean(false);
//leader必须续租的时间 单位秒
private static final int threshold = 5;
 private final ListeningScheduledExecutorService executorService = MoreExecutors
            .listeningDecorator(Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("server_leader_election");
                    return thread;
                }
            }));
    private final Supplier<String> holder = Suppliers.memoize(new Supplier<String>() {
        @Override
        public String get() {
            //hostname:port:pid
            return NetUtils.getLocalHost() + ":"
                    + appConfig.getServer().getPort() + ":"
                    + appConfig.getServer().getPid();
        }
    });
    /**
     * 进行选举
     */
    public void start(final String key, final LeaderChangedListener listener) {
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //进行选举
                leaderElectionDao.elect(key, holder.get(), threshold);
                //判断自己是否是leader
                if (amITheLeader(key)) {
                    if (isLeader.compareAndSet(false, true)) {
                        //之前不是leader,现在被选为leader
                        listener.own();
                    }
                } else {
                    if (isLeader.compareAndSet(true, false)) {
                        //之前是leader,现在不是了
                        listener.lost();
                    }
                }
            }
        }, INITIAL_DELAY, PERIOD, TimeUnit.SECONDS);
        //每1秒调度一次
    }

提示:

在使用mysql的leader选举替换线上的zk的时候,需要注意在发布过程中出现2个leader的情况.需要加一个开关.进行特殊处理.

本文版权归作者所有,禁止一切形式的转载,复制等操作
赞赏

微信赞赏支付宝赞赏

发表评论

电子邮件地址不会被公开。 必填项已用*标注