Distributed Dynamic Real-Time Scheduling Solution

Author Introduction

Chen Junjie, Senior Technical Manager of the Backend Service Development Team at the R&D Center of the Cultural and Creative Group, mainly responsible for the development of application service systems. Passionate about the design and optimization of application architecture and focused on the development of new technologies.

Distributed Dynamic Real-Time Scheduling Solution

Recently, a requirement was encountered in a project where the system needs to continuously push various messages to users watching live broadcasts (such as concerts, performances, etc.). A careful analysis of this requirement reveals the following characteristics:

· The live broadcast content (start and end times) can be dynamically edited and managed.

· The real-time requirement for live broadcasts is very high; the pushed messages must reach the user as quickly as possible.

· Only one server (Tomcat Server) can perform the push task for the same live broadcast program. If multiple servers execute the push simultaneously for the same live broadcast, it will cause duplicate messages on the client side.

· The number of live broadcast programs will not be large; a single service can fully bear the push tasks of all programs.

The system’s deployment diagram is as follows:

Distributed Dynamic Real-Time Scheduling Solution

The first solution that comes to mind is to have the Tomcat Server periodically (every n seconds) start the push task, with the task processing logic as follows:

· The server first competes for the distributed lock (which can be coordinated by Redis).

· The server that acquires the lock queries the currently live content and then starts the push task.

Frameworks like Spring and Quartz provide excellent support for scheduled tasks. However, careful analysis reveals the following shortcomings of this implementation:

· The server needs to actively compete for the distributed lock. (There is a lock competition every N seconds, which incurs high system overhead.)

· Starting scheduled tasks to query the database and create push threads every N seconds (usually set to every 5 seconds) consumes too many system resources, and this solution cannot achieve the desired real-time performance (after all, there is a 5-second delay).

So how can we solve this?

First, we introduced a distributed coordination system (Zookeeper). Here is a brief introduction to Zookeeper:

Zookeeper is an open-source distributed application coordination service, an open-source implementation of Google Chubby, and an important component of Hadoop and Hbase. It provides consistent services for distributed applications, including configuration maintenance, domain name services, distributed synchronization, group services, etc. The goal of Zookeeper is to encapsulate complex and error-prone key services, providing users with simple-to-use interfaces and efficient, stable systems.

We also introduced a corresponding library (Curator). Here is a brief introduction to Curator:

Curator is an open-source Zookeeper client framework developed by Netflix. During its use of Zookeeper, Netflix found that the built-in client was too low-level, requiring users to handle many things themselves. Therefore, it wrapped it and provided a more user-friendly client library.

With the introduction of Zookeeper, the system’s deployment diagram becomes as follows:

Distributed Dynamic Real-Time Scheduling Solution

Through Zookeeper’s leader election feature, we can effectively solve the problem of servers actively competing for distributed locks. Moreover, the competition for the distributed lock only occurs when the server starts, avoiding the overhead of repeated lock competition every N seconds. Zookeeper can also address the issue of service unavailability caused by server crashes; if the server that competes for the lock crashes, a new leader will be elected.

Another challenge encountered is how to have the leader server execute the corresponding push logic when a user modifies the live broadcast information (the most common is modifying the start time of the live broadcast). For a clearer description, let’s assume that there are four servers deployed in the system, and user A modifies a live program on server 1 (the submitted service randomly selects a server), while the leader of the push service is on server 3. This modified data will be stored in a MySQL database. One solution is to have the leader server periodically pull the live broadcast data from the database and create push tasks according to business logic. However, this solution, as mentioned above, consumes a lot of resources, querying the database every N seconds and still having an N-second push delay. At this point, we thought of the observer pattern, but the conventional observer pattern is typically applied within the same JVM. Therefore, we considered a distributed observer pattern based on Zookeeper, as shown in the class diagram below:

Distributed Dynamic Real-Time Scheduling SolutionThe system first defines two interfaces with the following functions:

DistributedSubject: The abstract subject role, equivalent to an abstract message; the specific message object needs to implement this interface. This interface defines a method to notify the corresponding observer objects, which is the notifyObservers method.

DistrbutedObserve: The abstract observer role; all observer objects must implement this interface. This interface has only one method, the register method, used to register observer objects. Because it is based on Zookeeper, the register method takes a CuratorFramework object as a parameter.

The system has two classes implementing these two interfaces, described as follows:

ContentDistributedSubject implements the DistributedSubject interface. The core code of the notifyObservers method writes a random value (composed of the time and machine IP) to the Zookeeper Znode.

Dispatcher implements the DistrbutedObserve interface. The core of the register method utilizes the TreeCache class provided by Curator, which can register listeners to monitor the value of a specific Znode. Once the monitored Znode value changes, the listener code will be executed. Additionally, the Dispatcher class inherits from LeaderSelectorListenerAdapter to ensure that only the server elected as the leader executes the registered listener code.

The specific scenario is illustrated in the following diagram:

Distributed Dynamic Real-Time Scheduling Solution

This article is based on the actual problems encountered in the project, focusing on describing the problem-solving ideas while not elaborating too much on implementation details. The author’s implementation may not necessarily be optimal, but it is meant to inspire and hopefully be helpful to you. The core class code is as follows:

Dispatcher——Logic control, observer class

public class Dispatcher extends LeaderSelectorListenerAdapter implements DispatcherObserve {

private ExecutorService pushThreadPool = Executors.newCachedThreadPool();

private Thread thread;

private final Object obj = new Object();

@Autowired

private ContentService contentService;//Specific business

private ExecutorService pool = Executors.newFixedThreadPool(2);

private final static Logger log = LoggerFactory.getLogger(Dispatcher.class);

// The current constructor will execute when the system starts to performZookeeper leader election

public Dispatcher(String host, String basePath, String leaderPath) {

CuratorFramework client = CuratorFrameworkFactory.builder()

.connectString(host).sessionTimeoutMs(30000)

.connectionTimeoutMs(30000).canBeReadOnly(false)

.retryPolicy(new ExponentialBackoffRetry(1000, 10))

.defaultData(null).build();

this.basePath = basePath;

this.path = leaderPath;

leaderSelector = new LeaderSelector(client, basePath + path, this);

leaderSelector.autoRequeue();

client.start();

leaderSelector.start();

log.info(“client&leaderSelector started!”);

}

//The server that successfully becomes theleader will execute the current method, and Zookeeper ensures that only one server executes this method

@Override

public void takeLeadership(CuratorFramework client) {

String leaderValue = IpUtils.getHostName() + “@” + IpUtils.getRealIp();

try {

client.setData().forPath(basePath + path, leaderValue.getBytes());

} catch (Exception e2) {

}

log.info(“I become the leader! leader value is: “ + leaderValue);

//leader registers as a distributed observer

register(client);

while (true) {

try {

log.info(“start new business!!!”);

// If there is currently no live broadcast, block the thread and get how much time is left until the next live broadcast starts to wake up the thread

// Get the ongoing and upcoming live broadcasts

List<Content> contents = contentService.getLiving();

if (contents != null && !contents.isEmpty()) {

log.info(“start deal living content! living content length is: “ + contents.size());

// Process the content of the ongoing live broadcast

for (Content content : contents) {

// Determine if the live broadcast is currently being pushed

String path = basePath + “/content_” + content.getId();

Stat stat = client.checkExists().forPath(path);

if (stat == null) {

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, leaderValue.getBytes());

pushThreadPool.execute(new PushTask(content, client, path));

}

}

}

List<Content> unstartContents = contentService.getUnstart();

if (unstartContents != null && !unstartContents.isEmpty()) {

log.info(“start deal unstart live content! unstart live content length is: “ + unstartContents.size());

// Process the upcoming live broadcasts,

long waitTine = unstartContents.get(0).getStartTime().getTime() – System.currentTimeMillis();

MonitorTask task = new MonitorTask(obj, waitTine);

if (thread != null && thread.isAlive()) {

// thread.interrupt();

thread.stop();

}

thread = new Thread(task);

thread.setDaemon(true);

thread.start();

}

synchronized (obj) {

log.info(“I am waiting!”);

obj.wait();

log.info(“obj notify…”);

}

} catch (Exception e) {

}

}

}

/**

* Add a distributed observer to listen for data changes of the node, listening infinitely

*/

@Override

public void register(CuratorFramework client) {

final String monitorpath = basePath + ContentDistributedSubject.CONTENT_LISTENER_NODE;

final TreeCache treeCache = new TreeCache(client, monitorpath);

try {

treeCache.getListenable().addListener(new TreeCacheListener() {

@Override

public void childEvent(CuratorFramework client,

TreeCacheEvent event) throws Exception {

synchronized (obj) {

obj.notify();

}

ChildData data = treeCache.getCurrentData(monitorpath);

if(data!=null){

log.info(“obj notify!!! event type:” + event.getType() + ” ,Node data is changed, new data: “+ new String(data.getData()));

}else{

log.info(“obj notify!!! event type:” + event.getType() + ” ,Node data is changed “);

}

}

},pool);

treeCache.start();

} catch (Exception e) {

}

}

}

ContentDistributedSubject——Subject class

public class ContentDistributedSubject implements DistributedSubject {

private String basePath;

private String host;

public static final String CONTENT_LISTENER_NODE = “/contentListener”;

private static final Logger log = LoggerFactory.getLogger(ContentDistributedSubject.class);

public ContentDistributedSubject(String host,String basePath) {

this.basePath = basePath;

this.host = host;

}

//Change the value of the Zookeeper

@Override

public void notifyObservers(){

CuratorFramework client = CuratorFrameworkFactory.builder()

.connectString(host)

.sessionTimeoutMs(30000)

.connectionTimeoutMs(30000)

.canBeReadOnly(false)

.retryPolicy(new ExponentialBackoffRetry(1000, 10))

.defaultData(null)

.build();

client.start();

String path = basePath + CONTENT_LISTENER_NODE+“/contentMonitor”;

try {

Stat stat = client.checkExists().forPath(path);

byte[] value = (System.currentTimeMillis()+“@”+IpUtils.getRealIp()).getBytes();

if(stat == null){

//Create persistent node

client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path,value);

}else{

client.setData().forPath(path,value);

}

log.info(path+” data change!”);

} catch (Exception e) {

}finally{

client.close();

}

}

}

Distributed Dynamic Real-Time Scheduling Solution

Leave a Comment