Fred Feng

A passionate Java programmer

Just a passionate Java programmer and Like coding and reading


Download the theme

Introduction to Chaconne - To Build High Available Distributed Job Scheduling System

chaconne is a powerful distributed job scheduling framework based on the SpringBoot framework. It can help you build a distributed task cluster easily and quickly without extra learning cost.

Chaconne Feature:


  1. Perfect support for SpringBoot framework (2.2.0 +)
  2. Support job settings in multiple ways (cron expression, parameter setting, etc.)
  3. Support dynamically saving jobs and deleting jobs operations
  4. Support configuring scheduled jobs in annotated way
  5. Support two scheduling methods in cluster mode (master slave mode and load balancing mode)
  6. Built in a variety of load balancing algorithms, supporting custom load balancing algorithms
  7. Support failure retry and failure over policy
  8. Support tracking job by logging
  9. Support job initial parameter with slice processing
  10. Support dependency of multi-jobs (serial dependency and parallel dependency)
  11. Support DAG to simulate business workflow
  12. Support customizing job termination policies
  13. Support job running timeout cooling and resetting
  14. Support email alarm for job failure

Two deployment mode of Chaconne Application Cluster:


  1. Decentralized deployment mode
    • No fixed scheduling center role, the chaconne cluster will elect one of the applications as a leader for job scheduling
    • Applications participating in scheduling and execution interact through TCP protocol
  2. Centralized deployment mode
    • It is divided into two roles: scheduling center and job executor, and both scheduling center and job executor support cluster mode
    • The scheduling center interacts with the job executor through HTTP protocol

Description: The cluster here refers to the cluster composed of applications participating in job execution (chaconne cluster). It is an independent concept from the cluster composed of SpringCloud framework

If the chaconne cluster is small, the decentralized deployment mode is recommended. If the cluster is large, both modes can be used according to the actual situation.

Structure of the Chaconne Framework:


  1. chaconne-spring-boot-starter
    The core jar of chaconne, which contains all the core functions of chaconne (including the external API of the Web UI)
  2. chaconne-console Chaconne Web UI,Doing job management and query job running status
  3. chaconne-manager If centralized deployment is adopted, you can refer this demo of scheduling center

Install:


<dependency>
	  <groupId>com.github.paganini2008.atlantis</groupId>
     <artifactId>chaconne-spring-boot-starter</artifactId>
     <version>1.0-RC3</version>
</dependency>

(Please use the latest version)

Compatibility:


  • Jdk1.8 (or later)
  • SpringBoot 2.2.0 (or later)
  • Redis 3.0 (or later)
  • MySQL 5.0 (or later)

Description:

  • Redis is used to access cluster information and broadcast messages
  • MySQL is used to save job definition and runtime related data. At present, it only supports MySQL. Relevant Tables will be created automatically when the application starts

Required Settings:


spring.application.cluster.name=jobtester-cluster  # set chaconne cluster name
spring.application.name=jobtester

#Jdbc Configuration
atlantis.framework.chaconne.datasource.jdbcUrl=jdbc:mysql://localhost:3306/demo?userUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF8&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull
atlantis.framework.chaconne.datasource.username=fengy
atlantis.framework.chaconne.datasource.password=123456
atlantis.framework.chaconne.datasource.driverClassName=com.mysql.cj.jdbc.Driver

#Redis Configuration
atlantis.framework.redis.host=localhost
atlantis.framework.redis.port=6379
atlantis.framework.redis.password=123456
atlantis.framework.redis.database=0

spring.redis.messager.pubsub.channel=chaconne-management-messager-pubsub

Brief introduction of chaconne implementation principle


The bottom layer of chaconne relies on the tridenter-spring-boot-starter component to realize the task cluster mode (active standby mode and load balancing mode), and uses the message unicast mechanism (simulated by Redis PubSub) to realize task distribution, load balancing, fragment processing and other advanced features. Note that the definition of cluster in chaconne framework is consistent with that in tridenter-spring-boot-starter. The concept of cluster is equivalent to distinguishing different product groups or companies. At the same time, chaconne also supports the concept of task group, which is an optional configuration. By default, the group name is the current application name (${spring. Application. Name}), If there are multiple applications with the same application name, these applications become a task group. Chaconne supports not only cross group task calls, but also cross cluster task calls.

How to define a Job?


  1. Using annotation @ChacJob
  2. Inherit ManagedJob
  3. Implements Job
  4. Implements NotManagedJob

Description:

  • The first three methods of defining Job belong to declarative (programming) definition, In other word, a task is defined in code and loaded automatically with the start of the Spring Framework context
  • The last definition method is used to define dynamic tasks. Users can submit to create jobs on the Web UI (Chaconne Console) or directly create tasks by calling HTTP API / SDK. Note that the job objects created by this way do not belong to bean objects managed by Spring Application Context

Examples:

  • Creating a Job by annotation
@ChacJob
@ChacTrigger(cron = "*/5 * * * * ?")
public class DemoCronJob {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoCronJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoCronJob's return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoCronJob is failed by cause: {}", e.getMessage(), e);
	}

}
  • Creating a Job by implementing Job
@Component
public class HelloWorldJob implements Job {

	@Override
	public String getClusterName() {
		return "your_cluster_name";
	}

	@Override
	public String getGroupName() {
		return "your_group_name";
	}

	@Override
	public int getRetries() {
		return 3;
	}

	@Override
	public long getTimeout() {
		return 60 * 1000L;
	}

	@Override
	public String getEmail() {
		return "your_email@helloworld.com";
	}

	@Override
	public Trigger getTrigger() {
		return GenericTrigger.Builder.newTrigger(1L, SchedulingUnit.MINUTES, false).build();
	}

	@Override
	public Object execute(JobKey jobKey, Object result, Logger logger) {
		return "Hello World!";
	}

	@Override
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		if (log.isInfoEnabled()) {
			log.info(result.toString());
		}
	}

}
  • Creating a Job by inherit ManagedJob
@Component
public class HealthCheckJob extends ManagedJob {

	@Override
	public long getTimeout() {
		return 60L * 1000;
	}

	@Override
	public Trigger getTrigger() {
		return GenericTrigger.Builder.newTrigger("*/5 * * * * ?").setStartDate(DateUtils.addSeconds(new Date(), 30)).build();
	}

	@Override
	public Object execute(JobKey jobKey, Object arg, Logger log) {
		if (log.isInfoEnabled()) {
			log.info(info());
		}
		return UUID.randomUUID().toString();
	}

	@Override
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		if (log.isInfoEnabled()) {
			log.info(result.toString());
		}
	}

	private String info() {
		long totalMemory = Runtime.getRuntime().totalMemory();
		long usedMemory = totalMemory - Runtime.getRuntime().freeMemory();
		return FileUtils.formatSize(usedMemory) + "/" + FileUtils.formatSize(totalMemory);
	}

}

How to create a dynamic task?

  1. Create on the Web UI (Described Later)

  2. Create by API

    • Creating a Job by inherit NotManagedJob ``` java public class EtlJob implements NotManagedJob {

    @Override public Object execute(JobKey jobKey, Object attachment, Logger log) { log.info(“JobKey:{}, Parameter: {}”, jobKey, attachment); return null; }

}


* Using HTTP API
POST  http://localhost:6543/job/admin/persistJob

``` json
{
    "jobKey": {
        "clusterName": "yourCluster",
        "groupName": "yourGroup",
        "jobName": "yourJob",
        "jobClassName": "com.yourcompany.yourapp.YourJob"
    },
    "description": "Describe your job shortly",
    "email": "YourEmail@yourcompany.com",
    "retries": 0,
    "timeout": -1,
    "weight": 100,
    "dependentKeys": null,
    "forkKeys": null,
    "completionRate": -1,
    "trigger": {
        "triggerType": 1,
        "triggerDescription": {
            "cron": {
                "expression": "*/5 * * * * ?"
            }
        },
        "startDate": null,
        "endDate": null,
        "repeatCount": -1
    },
    "attachment": "{\"initialParameter\": \"test\"}"
}
  • Using SDK
@Component
public class TestService {
	
	@Autowired
	private JobManager jobManager;

	public void createJob() throws Exception {
		final JobKey jobKey = JobKey.by("yourCluster", "yourGroup", "yourJob", "com.yourcompany.yourapp.YourJob");
		GenericJobDefinition.Builder builder = GenericJobDefinition.newJob(jobKey)
				.setDescription("Describe your job shortly")
				.setEmail("YourEmail@yourcompany.com")
				.setRetries(3)
				.setTimeout(60000L);
		GenericTrigger.Builder triggerBuilder = GenericTrigger.Builder.newTrigger("*/5 * * * * ?");
		builder.setTrigger(triggerBuilder.build());
		GenericJobDefinition jobDefinition = builder.build();
		
		jobManager.persistJob(jobDefinition, "{\"initialParameter\": \"test\"}");
	}

}

Note: It is recommended that the job initializing parameter is in JSON format.

Job dependency


Job dependency is one of the important features of Chaconne Framework. Job dependency can be divided into Serial Dependency and Parallel Dependency. Serial dependency means that Job A is completed and then Job B will be executed, We can see Job B depends on Job A. So what does parallel dependency? For example, there are three tasks, Job A, Job B, and Job C. Job C can only be executed after Job A and Job B are all completed, which is similar to countersignature. Both serial dependency and parallel dependency can share job initializing parameters and running results during the job execution, and support user-defined judgment strategies to decide whether to trigger downstream tasks.

DAG (Directed Acyclic Graph)

Based on the combination of serial dependency and parallel dependency, Chaconne Framework provides DAG function and friendly API to simulate business scenarios similar to workflow, which enriches the use scenarios of task dependency. (for the convenience of examples, tasks are configured by annotation)

  • Serial Dependency
@ChacJob
@ChacTrigger(triggerType = TriggerType.DEPENDENT)
@ChacDependency({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoSchedJob", name = "demoSchedJob") })
public class DemoDependentJob {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoDependentJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoDependentJob's return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoDependentJob is failed by cause: {}", e.getMessage(), e);
	}

}
  • Parallel dependency: Here are three Jobs,DemoTask, DemoTaskOne, DemoTaskTwo

Let DemoTaskOne and DemoTaskTwo finish before executing DemoTask, and DemoTask can get the result data of DemoTaskOne and DemoTaskTwo after execution

DemoTaskOne.java

@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskOne {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoTaskOne is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoTaskOne return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoTaskOne is failed by cause: {}", e.getMessage(), e);
	}

}

DemoTaskTwo.java

@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskTwo {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoTaskTwo is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoTaskTwo return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoTaskTwo is failed by cause: {}", e.getMessage(), e);
	}
	
}

DemoTask.java

@ChacJob
@ChacTrigger(cron = "0 0/1 * * * ?", triggerType = TriggerType.CRON)
@ChacFork({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskOne", name = "demoTaskOne"),
		@ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskTwo", name = "demoTaskTwo") })
public class DemoTask {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoTask is running at: {}", DateUtils.format(System.currentTimeMillis()));
		TaskJoinResult joinResult = (TaskJoinResult) attachment;
		TaskForkResult[] forkResults = joinResult.getTaskForkResults();
		long max = 0;
		for (TaskForkResult forkResult : forkResults) {
			max = Long.max(max, (Long) forkResult.getResult());
		}
		return max;
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoTask return max value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoTask is failed by cause: {}", e.getMessage(), e);
	}

}
  • Create a DAG Dag Jobs only support API creation at present
@RequestMapping("/dag")
@RestController
public class DagJobController {

	@Value("${spring.application.cluster.name}")
	private String clusterName;

	@Value("${spring.application.name}")
	private String applicationName;

	@Autowired
	private JobManager jobManager;

	@GetMapping("/create")
	public Map<String, Object> createDagTask() throws Exception {
		Dag dag = new Dag(clusterName, applicationName, "testDag");
		dag.setTrigger(new CronTrigger("0 0/1 * * * ?"));
		dag.setDescription("This is only a demo of dag job");
		DagFlow first = dag.startWith(clusterName, applicationName, "demoDagStart", DemoDagStart.class.getName());
		DagFlow second = first.flow(clusterName, applicationName, "demoDag", DemoDag.class.getName());
		second.fork(clusterName, applicationName, "demoDagOne", DemoDagOne.class.getName());
		second.fork(clusterName, applicationName, "demoDagTwo", DemoDagTwo.class.getName());
		jobManager.persistJob(dag, "123");
		return Collections.singletonMap("ok", 1);
	}

}

The above DAG example illustrates that the DAG model provided by the chaconne framework supports serial inflow, that is, flow mode, and also provides fork mode for parallel processing. In the above example, the task demoDag forks two sub-processes (“demoDagOne” and “demoDagTwo” ), that is, demoDagOne and demoDagTwo are processed at the same time and then the demoDag task is triggered.

Deployment description


  • Decentralized deployment Add the @EnableChaconneEmbeddedMode annotation to the main function of your spring application, and then start your application. Example:
@EnableChaconneEmbeddedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {

	public static void main(String[] args) {
		final int port = 8088;
		System.setProperty("server.port", String.valueOf(port));
		SpringApplication.run(YourApplicationMain.class, args);
	}

}
  • Centralized deployment
    • To start the scheduling center, you need to create a new SpringBoot project, add annotation @EnableChaconneDetachedMode to the main function and specify it as the production side Example:
@EnableChaconneDetachedMode(DetachedMode.PRODUCER)
@SpringBootApplication
public class ChaconneManagementMain {

	public static void main(String[] args) {
		SpringApplication.run(ChaconneManagementMain.class, args);
	}
}

(DataSource and RedisConnectionFactory need to be configured)

Or use the annotation @ChaconneAdmin directly Example:

@ChaconneAdmin
@SpringBootApplication
public class ChaconneManagerApplication {

	static {
		System.setProperty("spring.devtools.restart.enabled", "false");
		File logDir = FileUtils.getFile(FileUtils.getUserDirectory(), "logs", "indi", "atlantis", "framework", "chaconne", "management");
		if (!logDir.exists()) {
			logDir.mkdirs();
		}
		System.setProperty("LOG_BASE", logDir.getAbsolutePath());
	}

	public static void main(String[] args) {
		SpringApplication.run(ChaconneManagerApplication.class, args);
		System.out.println(Env.getPid());
	}
}
  1. Add the @EnableChaconneDetachedMode annotation to the main function of your Spring application (the default is the consumer side), and then start
@EnableChaconneDetachedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {

	public static void main(String[] args) {
		final int port = 8088;
		System.setProperty("server.port", String.valueOf(port));
		SpringApplication.run(YourApplicationMain.class, args);
	}

}

How to use Chaconne Console?


Chaconne Console is a Web project for task management and viewing provided by the chaconne framework. It also supports decentralized deployment and centralized deployment mode. The default port is 6140

Provides the following functions:

  1. Save tasks and view task information
  2. Pause and resume tasks
  3. Delete task
  4. Run the task manually
  5. View task statistics (by day)
  6. View task runtime log

At present, the Chaconne Console project is still under continuous maintenance. Some functions are slightly rough, and some functions are not yet open. Similarly, Chaconne Console is also a SpringBoot project Source Code:

@EnableChaconneClientMode
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class ChaconneConsoleMain {

	static {
		System.setProperty("spring.devtools.restart.enabled", "false");
		File logDir = FileUtils.getFile(FileUtils.getUserDirectory(), "logs", "indi", "atlantis", "framework", "chaconne", "console");
		if (!logDir.exists()) {
			logDir.mkdirs();
		}
		System.setProperty("DEFAULT_LOG_BASE", logDir.getAbsolutePath());
	}

	public static void main(String[] args) {
		SpringApplication.run(ChaconneConsoleMain.class, args);
		System.out.println(Env.getPid());
	}

}

The annotation @EnableChaconneClientMode means to enable a task management client After startup, enter the homepage address: http://localhost:6140/chaconne/index You will see: image.png

Job List: image.png

Create a Job: image.png

Job Json Data: image.png

Job Detail: image.png

Job Trace: image.png

Job Log:

  • info: image.png

  • error: image.png

Job Statistics: image.png You can view the statistics of each job (by day) image.png

Documentation: image.png

Git repository:https://github.com/paganini2008/chaconne.git

Recent Articles

Introduction to Greenfinger - A High Performance Distributed Web Crawler Framework

GreenFinger is a high-performance, highly scalable distributed web crawler built in Java. Designed for both enterprise and individual users, it offers an intuitive user interface and minimal configuration, enabling seamless and efficient web resou...…

Read More
Earlier Articles

FastJPA - The Best Assistant for JPA

A very cool JPA developping tookit, which provides an API to use JPA core class with a fluent style. It aims to promote to use JPA in object-oriented way rather than to write JQL or HQL in your code. In fact, fastjpa-spring-boot-starter is r...…

Read More