Categories
DevOps

How to use ZooKeeper to elect the leader with a Python script

Use ZooKeeper to elect the leader and share data with a Python script.

Create leader_example.py Python script.

#!/usr/bin/env python3

from kazoo.client import KazooClient
from datetime import datetime
import time
import argparse


class ApplicationNode(object):

    def __init__(self, server_name, server_data, chroot, zookeeper_hosts=''):
        self.zookeeper = KazooClient(hosts=zookeeper_hosts)

        self.server_name = server_name
        self.server_data = server_data

        self.patch_chroot = chroot
        self.path_nodes = "/nodes"
        self.path_data = "/data"

        self.connect()
        self.chroot()
        self.register()
        self.watch_application_nodes()
        self.watch_application_data()

    def connect(self):
        self.zookeeper.start()

    def chroot(self):
        self.zookeeper.ensure_path(self.patch_chroot)
        self.zookeeper.chroot = self.patch_chroot

    def register(self):
        self.zookeeper.create("{0}/{1}_".format(self.path_nodes, self.server_name),
                              ephemeral=True, sequence=True, makepath=True)

    def watch_application_data(self):
        self.zookeeper.ensure_path(self.path_data)
        self.zookeeper.DataWatch(path=self.path_data, func=self.check_application_data)

    def watch_application_nodes(self):
        self.zookeeper.ensure_path(self.path_nodes)
        self.zookeeper.ChildrenWatch(path=self.path_nodes, func=self.check_application_nodes)

    def check_application_nodes(self, children):
        application_nodes = [{"node": i[0], "sequence": i[1]} for i in (i.split("_") for i in children)]
        current_leader = min(application_nodes, key=lambda x: x["sequence"])["node"]

        self.display_server_information(application_nodes, current_leader)
        if current_leader == self.server_name:
            self.update_shared_data()

    def check_application_data(self, data, stat):
        print(
            "Data change detected on {0}:\nData: {1}\nStat: {2}".format((datetime.now()).strftime("%B %d, %Y %H:%M:%S"),
                                                                        data, stat))
        print()

    def update_shared_data(self):
        if not self.zookeeper.exists(self.path_data):
            self.zookeeper.create(self.path_data,
                                  bytes("name: {0}\ndata: {1}".format(self.server_name, self.server_data), "utf8"),
                                  ephemeral=True, sequence=False, makepath=True)

    def display_server_information(self, application_nodes, current_leader):
        print("Datetime: {0}".format((datetime.now()).strftime("%B %d, %Y %H:%M:%S")))
        print("Server name: {0}".format(self.server_name))
        print("Nodes:")
        for i in application_nodes:
            print("  - {0} with sequence {1}".format(i["node"], i["sequence"]))
        print("Role: {0}".format("leader" if current_leader == self.server_name else "follower"))
        print()

    def __del__(self):
        self.zookeeper.close()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='ZooKeeper example application')
    parser.add_argument('--server')
    parser.add_argument('--data')
    parser.add_argument('--chroot')
    parser.add_argument('--zookeeper')

    args = parser.parse_args()

    ApplicationNode(server_name=args.server, server_data=args.data, chroot=args.chroot, zookeeper_hosts=args.zookeeper)

    while True:
        time.sleep(10)

Use it to play with leader election and watches.

$ python3 leader_example.py --server server1 --data "mariadb://172.16.0.111:3306" --chroot /application --zookeeper 172.16.0.111:2181,172.16.0.112:2181,172.16.0.113:2181
Datetime: June 15, 2021 23:38:38
Server name: server1
Nodes:
  - server1 with sequence 0000000000
Role: leader

Data change detected on June 15, 2021 23:38:38:
Data: b'name: server1\ndata: mariadb://172.16.0.111:3306'
Stat: ZnodeStat(czxid=4294973701, mzxid=4294973701, ctime=1623718347592, mtime=1623718347592, version=0, cversion=0, aversion=0, ephemeralOwner=72057819866923296, dataLength=47, numChildren=0, pzxid=4294973701)

Datetime: June 15, 2021 23:38:50
Server name: server1
Nodes:
  - server2 with sequence 0000000001
  - server1 with sequence 0000000000
Role: leader

Datetime: June 15, 2021 23:38:59
Server name: server1
Nodes:
  - server3 with sequence 0000000002
  - server2 with sequence 0000000001
  - server1 with sequence 0000000000
Role: leader

^C
$ python3 leader_example.py --server server2 --data "mariadb://172.16.0.112:3306" --chroot /application --zookeeper 172.16.0.111:2181,172.16.0.112:2181,172.16.0.113:2181
Datetime: June 15, 2021 23:38:50
Server name: server2
Nodes:
  - server2 with sequence 0000000001
  - server1 with sequence 0000000000
Role: follower

Data change detected on June 15, 2021 23:38:50:
Data: b'name: server1\ndata: mariadb://172.16.0.111:3306'
Stat: ZnodeStat(czxid=4294973701, mzxid=4294973701, ctime=1623718347592, mtime=1623718347592, version=0, cversion=0, aversion=0, ephemeralOwner=72057819866923296, dataLength=47, numChildren=0, pzxid=4294973701)

Datetime: June 15, 2021 23:38:59
Server name: server2
Nodes:
  - server3 with sequence 0000000002
  - server2 with sequence 0000000001
  - server1 with sequence 0000000000
Role: follower

Datetime: June 15, 2021 23:39:33
Server name: server2
Nodes:
  - server3 with sequence 0000000002
  - server2 with sequence 0000000001
Role: leader

Data change detected on June 15, 2021 23:39:33:
Data: b'name: server2\ndata: mariadb://172.16.0.112:3306'
Stat: ZnodeStat(czxid=4294973707, mzxid=4294973707, ctime=1623718407178, mtime=1623718407178, version=0, cversion=0, aversion=0, ephemeralOwner=72057819866923297, dataLength=47, numChildren=0, pzxid=4294973707)

^C
$ python3 leader_example.py --server server3 --data "mariadb://172.16.0.113:3306" --chroot /application --zookeeper 172.16.0.111:2181,172.16.0.112:2181,172.16.0.113:2181
Datetime: June 15, 2021 23:38:59
Server name: server3
Nodes:
  - server3 with sequence 0000000002
  - server2 with sequence 0000000001
  - server1 with sequence 0000000000
Role: follower

Data change detected on June 15, 2021 23:38:59:
Data: b'name: server1\ndata: mariadb://172.16.0.111:3306'
Stat: ZnodeStat(czxid=4294973701, mzxid=4294973701, ctime=1623718347592, mtime=1623718347592, version=0, cversion=0, aversion=0, ephemeralOwner=72057819866923296, dataLength=47, numChildren=0, pzxid=4294973701)

Datetime: June 15, 2021 23:39:33
Server name: server3
Nodes:
  - server3 with sequence 0000000002
  - server2 with sequence 0000000001
Role: follower

Data change detected on June 15, 2021 23:39:33:
Data: None
Stat: None

Data change detected on June 15, 2021 23:39:33:
Data: b'name: server2\ndata: mariadb://172.16.0.112:3306'
Stat: ZnodeStat(czxid=4294973707, mzxid=4294973707, ctime=1623718407178, mtime=1623718407178, version=0, cversion=0, aversion=0, ephemeralOwner=72057819866923297, dataLength=47, numChildren=0, pzxid=4294973707)

Datetime: June 15, 2021 23:39:53
Server name: server3
Nodes:
  - server3 with sequence 0000000002
Role: leader

Data change detected on June 15, 2021 23:39:53:
Data: b'name: server3\ndata: mariadb://172.16.0.113:3306'
Stat: ZnodeStat(czxid=4294973709, mzxid=4294973709, ctime=1623718429178, mtime=1623718429178, version=0, cversion=0, aversion=0, ephemeralOwner=216173555704463436, dataLength=47, numChildren=0, pzxid=4294973709)

^C

Start and stop each application to see how everything changes.

Beware, the error handling is missing to keep this example more concise.