Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework

Background

Alibaba Cloud Log Service serves as a cloud-native observability and analysis platform. It provides a one-stop solution for data collection, processing, querying, visualization, alerting, consumption, and delivery, significantly enhancing users’ digital capabilities in development, operations, operations, and security scenarios.
The log service platform, as an observability platform, offers functions such as data import, data processing, aggregation, alerting, intelligent inspection, and export. These functions are referred to as tasks within the log service and are widely used. The following sections will mainly introduce the design and practice of the scheduling framework for these tasks.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
This introduction is divided into four parts:
  • Task Scheduling Background

  • Design of Billion-Level Task Scheduling Framework for Observability Platforms

  • Large-Scale Application of Task Scheduling Framework in Log Service

  • Outlook
Task Scheduling Background

General Scheduling

Scheduling is a very common technology in computing, from single machines to distributed systems and big data systems, scheduling can be found everywhere. Here, we attempt to summarize some common features of scheduling.
  • Operating System: From the perspective of single-machine operating systems like Linux, the kernel controls the execution time of processes on the processor through time slicing, where the priority of processes is linked to time slices. In simple terms, the execution of processes on a single CPU or a specific CPU is managed by the scheduler; K8s is referred to as the operating system of the distributed era. After a Pod is created, K8s’s control plane scheduler scores and ranks nodes to ultimately select suitable nodes to run the Pods.

  • Big Data Analysis Systems: From the earliest MapReduce using fair schedulers to support job priorities and preemption, to SQL computing engines like Presto that allocate tasks in execution plans to suitable workers via the Coordinator’s scheduler, Spark splits tasks into stages through DAGScheduler, and TaskScheduler finally schedules the TaskSet corresponding to the stages to suitable workers for execution.

  • Task Scheduling Framework: Common ETL processing tasks and scheduled tasks in data processing exhibit multi-modal characteristics: scheduled execution, continuous operation, one-time execution, etc. During task execution, considerations for task orchestration and state consistency are essential.

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
This is a simple abstraction of scheduling. As shown in the figure, scheduling is responsible for assigning different Tasks to different Resources for execution. Tasks can be processes, Pods, or subtasks; Resources are the specific resources for executing Task tasks, which can be processors, thread pools, nodes, or machines. Through this abstraction, we can see the position of scheduling in the system.
Scheduling covers a wide range, and this article mainly focuses on the design and practice of task scheduling frameworks. Here we will first look at some examples to observe some characteristics of task scheduling, focusing on two types of tasks: scheduled tasks and dependency tasks.

Task Scheduling

Scheduled Tasks

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Scheduled execution can be understood as having a temporal sequence between each task, requiring execution at specific time points. For example, monitoring logs every hour requires the midnight monitoring task to execute first, followed by the 1 AM monitoring task executing precisely at 1 AM; similarly, related scheduled scenarios include dashboard subscriptions, scheduled calculations, etc.
Dependency Tasks
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
In addition to scheduled execution, there is another orchestration form, such as sequential dependency, where tasks have execution dependencies on each other, also known as the Pipeline method. Another common orchestration form is topological dependency, also known as DAG, where Task2/Task3 must wait for Task1 to complete before executing, and Task5 must wait for Task3/Task4 to complete before executing.
Characteristics of Task Scheduling
During execution, task scheduling needs to balance the assignment of tasks to suitable machines or executors as evenly as possible. For example, it should consider the current load of executors and the characteristics of the tasks themselves for assignment execution. During the execution of an executor, it may also crash or exit, necessitating the migration of tasks to other executors. The entire scheduling process needs to consider scheduling strategies, failover, task migration, etc. Next, we will look at a simple application of task scheduling.

Task Scheduling Application: The Adventure of a Log

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
The original log is an Nginx access log that includes information such as IP, time, method, URL, and UserAgent. Such raw logs are not conducive to analysis. For example, if we want to count the top 10 URLs with the highest access, the command processing would be as follows:
cat nginx_access.log |awk '{print $7}'| sort|uniq -c| sort -rn| head -10 | more
Setting aside the complexity of the command and the volume of raw logs, even if the requirements change slightly, the command would require significant modifications, making maintenance very difficult. The correct way to analyze logs is to use a distributed logging platform for log analysis. Raw logs contain a wealth of “information,” but extracting this information requires a series of processes.
First, data collection is needed, which requires an Agent to centrally collect data distributed across various machines to the log platform. After the logs are collected, they need to be cleaned. For example, for Nginx access logs, we use regular expressions to extract important information such as time, method, and URL, storing them as fields and building indexes. Through the index, we can use SQL-like analysis syntax to analyze the logs. For instance, to view the top 10 URLs accessed, expressing it in SQL would be very concise and clear:
select url, count(1) as cnt from log group by url order by cnt desc limit 10
As long as the business system is operational, logs will continuously be generated. By inspecting streaming logs, we can achieve the detection of system anomalies. When anomalies occur, we can notify system operations personnel through alerts.
General Process Extraction
From such a log analysis system, we can extract some general processes, which can be summarized as data ingestion, data processing, data monitoring, and data export.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework

In addition to logs, the system also has Trace data and Metric data, which are the three pillars of the observability system. This process is also applicable to observability service platforms. Next, we will look at the composition of a typical observability service platform’s data flow.

Typical Data Flow of Observability Service Platforms
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
  • Data Ingestion: In the observability service platform, it is necessary to expand data sources first. Data sources may include various logs, message queues like Kafka, storage OSS, cloud monitoring data, etc., as well as various database data. By enriching the data sources, we can achieve comprehensive observability of the system.

  • Data Processing: After data is ingested into the platform, it needs to be cleaned and processed. This process is collectively referred to as data processing. Data processing can be understood as various transformations and enrichments of the data. Aggregation processing supports timed rolling up operations, such as calculating summary data for the past day every day, providing data with higher information density.

  • Data Monitoring: Observability data itself reflects the operational state of the system. The system exposes specific metrics for each component to reveal the health level of the component. Intelligent inspection algorithms can monitor abnormal metrics, such as a sharp increase or decrease in QPS or Latency. When anomalies occur, alerts can notify relevant operations personnel. Based on the metrics, various operational or operational dashboards can be created. Sending dashboards to groups daily is also a scenario requirement.

  • Data Export: The value of observability data often diminishes over time. Therefore, for long-term log data retention, it can be exported to other platforms.

From the above four processes, we can abstract various tasks responsible for ingestion, processing, detection, etc. For example, data processing is a type of persistent task that continuously processes data streams, while dashboard subscriptions are scheduled tasks that need to regularly send dashboards to emails or workgroups. Next, we will introduce the scheduling framework for various tasks.
Design of the Billion-Level Task Scheduling Framework for Observability Platforms
Based on the introduction to observability platform tasks, we can summarize the characteristics of a typical observability platform task:
  • Business complexity and diverse task types: Data ingestion alone may involve dozens or even hundreds of data sources.

  • Large user base and numerous tasks: As a cloud business, each customer has a substantial demand for task creation.

  • High SLA requirements: High service availability is required, and backend service upgrades and migrations must not affect the execution of users’ existing tasks.

  • Multi-tenancy: In cloud business, customers must not directly affect each other.

Design Goals for Task Scheduling in Observability Platforms

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Based on the characteristics of platform tasks, we need to achieve the goals shown in the figure for the scheduling framework.
  • Support heterogeneous tasks: Alerts, dashboard subscriptions, data processing, and aggregation processing each have different characteristics. For example, alerts are scheduled tasks, data processing is persistent tasks, and dashboard preview subscriptions are one-time tasks.

  • Massive task scheduling: For a single alert task, if it executes once per minute, there will be 1,440 scheduling instances in a day. This number multiplied by the number of users and tasks will result in massive task scheduling. Our goal is to ensure that an increase in the number of tasks does not overwhelm machine performance, especially achieving horizontal scaling, where an increase in task count or scheduling instances only requires a linear increase in machines.

  • High availability: As a cloud business, we need to ensure that backend service upgrades, restarts, or even crashes do not affect user task execution. Both the user side and the backend service side need to have monitoring capabilities for task execution.

  • Simple and efficient operations: The backend service should provide a visual operation dashboard that intuitively displays service issues. Additionally, it should allow for alert configuration, enabling as much automation as possible during service upgrades and releases.

  • Multi-tenancy: The cloud environment naturally has multi-tenancy scenarios, and resources between tenants must be strictly isolated, with no resource or performance dependencies.

  • Scalability: In response to customers’ new requirements, future support for more task types is needed. For example, if there are already import tasks for MySQL and SQL Server, more database imports will be needed in the future. In this case, we need to modify only the plugin without changing the task scheduling framework.

  • API-driven: In addition to the above requirements, we also need to achieve API-driven management of tasks. Many overseas customers use APIs and Terraform to manage cloud resources, so task management needs to be API-driven.

Overall Overview of the Task Scheduling Framework for Observability Platforms

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Based on the aforementioned scheduling design goals, we have designed the observability task scheduling framework, as shown in the figure below. We will introduce it from the bottom up.
  • Storage Layer: This mainly includes the storage of task metadata and the runtime state and snapshot storage of tasks. Task metadata mainly includes task types, task configurations, and task scheduling information, all stored in a relational database; the running state and snapshots of tasks are stored in a distributed file system.

  • Service Layer: This provides the core functionalities of task scheduling, mainly including task scheduling and task execution, corresponding to the previously mentioned task orchestration and execution modules. Task scheduling mainly targets three types of tasks, including persistent tasks, scheduled tasks, and on-demand tasks. Task execution supports multiple execution engines, including Presto, RESTful interfaces, K8s engines, and an internally developed ETL 2.0 system.

  • Business Layer: The business layer includes functionalities that users can directly use in the console, such as alert monitoring, data processing, index rebuilding, dashboard subscriptions, aggregation processing, various data source imports, intelligent inspection tasks, and log delivery.

  • Access Layer: The access layer uses Nginx and CGI to provide services externally, featuring high availability and regional deployment.

  • API/SDK/Terraform/Console: On the user side, the console can be used to manage various tasks, providing customized interfaces and monitoring for different tasks. Additionally, APIs, SDKs, and Terraform can be used to perform CRUD operations on tasks.

  • Task Visualization: In the console, we provide visualizations of task execution and monitoring. Through the console, users can see the execution status and history of tasks, and also enable built-in alerts to monitor tasks.

Key Design Points of the Task Scheduling Framework

Next, we will introduce the key design points of the task scheduling framework, mainly covering the following aspects:
  • Heterogeneous Task Model Abstraction

  • Scheduling Service Framework

  • Support for Large-Scale Tasks

  • High Availability Design for Services

  • Stability Construction

Task Model Abstraction

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Next, let’s look at the abstraction of the task model:
  • For tasks that require scheduled execution, such as alert monitoring, dashboard subscriptions, and aggregation processing, they are abstracted as scheduled tasks, supporting scheduled and Cron expression settings.

  • For tasks that require continuous operation, such as data processing, index rebuilding, and data imports, they are abstracted as persistent tasks. These tasks often only need to run once and may or may not have an end state.

  • For preview features of data processing and dashboard subscriptions, tasks are created only when users click, and they exit after execution without needing to save the task state. These tasks are abstracted as DryRun types or on-demand tasks.

Scheduling Service Framework

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
The service framework uses a Master-Worker architecture, with the Master responsible for task dispatching and Worker management. The Master abstracts data into several partitions and dispatches these partitions to different Workers, achieving a divide-and-conquer approach to tasks. During Worker execution, the Master can also dynamically migrate partitions based on Worker load. Additionally, during Worker restarts or upgrades, the Master will remove and add partitions accordingly.

Task scheduling is mainly implemented at the Worker level, where each Worker is responsible for pulling tasks corresponding to its partitions. It then loads tasks through JobLoader. Note: It only loads the task list corresponding to the current Worker’s partitions. The Scheduler orchestrates tasks, involving the scheduling of persistent tasks, scheduled tasks, and on-demand tasks. The Scheduler sends the orchestrated tasks to the JobExecutor for execution, where the JobExecutor needs to persistently save the task status in the RedoLog during execution. When the Worker restarts or upgrades, it loads the task status from the RedoLog to ensure the accuracy of the task status.

Support for Large-Scale Tasks

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Through the introduction of the task service framework, we know that partitions serve as the bridge between the Master and Worker and are the medium for dividing large-scale tasks. As shown in the figure, assuming there are N tasks, they are mapped to corresponding partitions according to a certain hash algorithm. Since Workers are associated with specific partitions, they can be linked to tasks. For example, tasks j1 and j2 correspond to partition p1, while p1 corresponds to Worker worker1, allowing j1 and j2 to execute on worker1. It should be noted:
  • The correspondence between Workers and partitions is not fixed; it is a dynamic mapping. When a Worker restarts or is under high load, its corresponding partitions may migrate to other Workers, so Workers need to implement partition migration operations.

  • When the number of tasks increases, the presence of partitions allows for the addition of Workers to meet the growing task demand, achieving horizontal scaling. After adding new Workers, they can share more partitions.

High Availability Design for Services

High availability mainly refers to the service’s uptime. As a backend service, there will inevitably be demands for restarts and upgrades. High availability scenarios mainly involve handling partition migrations. When a Worker restarts, is under high load, or experiences an anomaly, there will be a need for partition migration. During partition migration, tasks also need to migrate, which involves retaining their status, similar to context switching of processes on a CPU.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
For task switching, we use the RedoLog method to save the status of tasks. A task can be divided into multiple stages corresponding to the task execution state machine. During each stage, memory checkpoints and RedoLog updates are performed. The RedoLog is persisted to the previously mentioned distributed file system, using a high-performance append method for sequential writing. When partitions migrate to new Workers, the new Workers can restore task status by loading the RedoLog.
Here, an optimization is involved. If the RedoLog continues to be written using the append method, it will inevitably lead to the RedoLog growing larger, which can slow down the loading speed of Workers when loading partitions. To address this, we use the Snapshot method to merge RedoLogs from the past period, allowing the loading of only the Snapshot and the RedoLog after the Snapshot, thus reducing file read times and costs, improving loading speed.
Stability Construction
Stability construction mainly involves the following aspects:
  • Release Process:

  • From compilation to release, all web-side operations are white-screened, with template-based releases. Each version can be tracked and rolled back.

  • Support for gray control at the cluster and task type levels, allowing for small-scale verification before full release.

  • Operations Process:

  • Providing internal operation APIs and web-side operations for repairing and handling abnormal jobs, reducing manual intervention in operations.

  • On-Call:

  • Internally, we have developed an inspection function to find abnormal tasks, such as those with excessively long start or stop times, which will log exceptions for tracking and monitoring.

  • Using alerting from log services based on exception logs, we can timely notify operations personnel when issues arise.

  • Task Monitoring:

  • User Side: In the console, we provide monitoring dashboards and built-in alert configurations for various tasks.

  • Service Side: In the backend, we can see the running status of cluster-level tasks, facilitating monitoring by backend operations personnel.

  • Additionally, the execution status and history of tasks are stored in a specific log database for tracing and diagnosis in case of issues.

Below are some examples of service-side dashboards displaying the execution status of alerts.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Below are the user-side task monitoring status and alert displays.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework

Large-Scale Applications

In the log service, task scheduling has already been applied on a large scale. Below is the running status of tasks in a single cluster in a certain region. Because alerts are scheduled and widely used, their daily scheduling instances have reached tens of millions, while aggregation processing has high application scenarios in rolling up, reaching millions. Data processing tasks, being persistent, have lower scheduling frequencies compared to scheduled tasks like alerts.
Next, let’s take aggregation processing as an example to look at the task scheduling scenarios.

Typical Task: Aggregation Processing

Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Aggregation processing involves periodically aggregating and querying data over a period and storing the results in another database, thus extracting high-density information with lower dimensions and storage requirements compared to raw data. It is suitable for scheduled analyses and global aggregation scenarios.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Here is an example of the execution status of aggregation processing, showing the execution situation for each time interval, including the number of rows processed, data volume processed, and result status. For tasks that fail to execute, manual retries can also be performed.
However, aggregation processing is not simply a scheduled execution logic. During the process, it needs to handle scenarios such as timeouts, failures, and delays. Next, we will briefly introduce each scenario.
Scheduling Scenario 1: Instance Delay Execution
Regardless of whether instances are delayed, the scheduling time of instances is pre-generated based on scheduling rules. Although earlier instances may cause subsequent instances to be delayed, execution progress can gradually catch up to reduce delays until timely execution is restored.Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Scheduling Scenario 2: Starting Aggregation Processing Jobs from a Historical Time Point
When creating an aggregation processing job at the current time point, historical data will be processed according to the scheduling rules, creating compensatory instances from the start time of scheduling, which will execute sequentially until they catch up with the data processing progress, after which new instances will be executed according to the planned schedule.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Scheduling Scenario 3: Executing Aggregation Processing Jobs Within a Fixed Time
If it is necessary to schedule logs for a specified time period, the scheduling time range can be set. If an end time is set, the last instance (with a scheduling time less than the end time) will not generate new instances after it is completed.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Scheduling Scenario 4: Impact of Modifying Scheduling Configuration on Generated Instances
After modifying the scheduling configuration, the next instance will be generated according to the new configuration. It is generally recommended to synchronously modify SQL time windows, scheduling frequencies, and other configurations to ensure the SQL time range between instances is continuous.
Design and Practice of Billion-Level Heterogeneous Task Scheduling Framework
Scheduling Scenario 5: Retrying Failed Instances
  • Automatic Retry

  • If an instance fails (e.g., due to insufficient permissions, nonexistent source or target databases, or invalid SQL syntax), the system supports automatic retries.

  • Manual Retry

  • If the retry count exceeds the maximum configured retry count or the retry time exceeds the maximum configured runtime, the retry ends, and the instance status is marked as failed, allowing the system to continue executing the next instance.
Outlook
  • Dynamic Task Types: Increase support for dynamic task types, such as more complex task scheduling with inter-task dependencies.

  • Multi-tenancy Optimization: Currently, task usage is limited to simple quota restrictions. In the future, further refinement of QoS for multi-tenancy to support larger quota settings is needed.

  • API Optimization and Improvement: The current task types are rapidly evolving, and the iteration speed of task APIs needs enhancement to allow for the addition of new task types without modifying or with minimal updates to the APIs.

Recommended Reading

1.How to Write a Good Technical Proposal?

2.Ten Years of Experience at Alibaba: Architectural Design Methods in Technical Practice

3.How to Implement Defensive Coding?

The GTS Cloud Smart Wind Rider Essay Contest is Live!

Cloud Smart is the embodiment of the “modular application” concept, helping everyone improve delivery speed, enhance delivery quality, and reduce labor costs. Participating in the Cloud Smart Essay Contest not only allows you to be evaluated by a professional mentor team but also gives you a chance to win an 888 RMB Cat Supermarket Card and a Tmall Genie Sound! The first 100 participants will receive a 38 RMB Tmall Supermarket Card, available on a first-come, first-served basis!

Click to read the original text for details.

Leave a Comment