Commit 3bc3deb2 authored by wenmo's avatar wenmo

daemon format

parent 04ef64e0
...@@ -12,7 +12,7 @@ public class TaskQueue<T> { ...@@ -12,7 +12,7 @@ public class TaskQueue<T> {
public void enqueue(T task) { public void enqueue(T task) {
synchronized (lock) { synchronized (lock) {
lock.notifyAll(); lock.notifyAll();
tasks.addLast( task ); tasks.addLast(task);
} }
} }
......
...@@ -37,7 +37,7 @@ public class DefaultThreadPool implements ThreadPool { ...@@ -37,7 +37,7 @@ public class DefaultThreadPool implements ThreadPool {
public static DefaultThreadPool getInstance() { public static DefaultThreadPool getInstance() {
if (defaultThreadPool == null) { if (defaultThreadPool == null) {
synchronized (DefaultThreadPool.class) { synchronized (DefaultThreadPool.class) {
if(defaultThreadPool == null){ if (defaultThreadPool == null) {
defaultThreadPool = new DefaultThreadPool(); defaultThreadPool = new DefaultThreadPool();
} }
} }
......
...@@ -3,13 +3,12 @@ package com.dlink.daemon.pool; ...@@ -3,13 +3,12 @@ package com.dlink.daemon.pool;
import com.dlink.daemon.task.DaemonTask; import com.dlink.daemon.task.DaemonTask;
/** /**
*
* @author lcg * @author lcg
* @operate * @operate
* @date 2022/3/7 10:36 * @date 2022/3/7 10:36
* @return * @return
*/ */
public interface ThreadPool{ public interface ThreadPool {
//执行任务 //执行任务
void execute(DaemonTask daemonTask); void execute(DaemonTask daemonTask);
......
...@@ -7,10 +7,10 @@ import java.util.List; ...@@ -7,10 +7,10 @@ import java.util.List;
public class DaemonFactory { public class DaemonFactory {
public static void start(List<DaemonTaskConfig> configList){ public static void start(List<DaemonTaskConfig> configList) {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
DefaultThreadPool defaultThreadPool = DefaultThreadPool.getInstance(); DefaultThreadPool defaultThreadPool = DefaultThreadPool.getInstance();
for (DaemonTaskConfig config: configList) { for (DaemonTaskConfig config : configList) {
DaemonTask daemonTask = DaemonTask.build(config); DaemonTask daemonTask = DaemonTask.build(config);
defaultThreadPool.execute(daemonTask); defaultThreadPool.execute(daemonTask);
} }
...@@ -24,8 +24,8 @@ public class DaemonFactory { ...@@ -24,8 +24,8 @@ public class DaemonFactory {
int num = taskSize / 100 + 1; int num = taskSize / 100 + 1;
if (defaultThreadPool.getWorkCount() < num) { if (defaultThreadPool.getWorkCount() < num) {
defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount() ); defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount());
}else if(defaultThreadPool.getWorkCount() > num) { } else if (defaultThreadPool.getWorkCount() > num) {
defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num); defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num);
} }
} }
...@@ -33,7 +33,7 @@ public class DaemonFactory { ...@@ -33,7 +33,7 @@ public class DaemonFactory {
thread.start(); thread.start();
} }
public static void addTask(DaemonTaskConfig config){ public static void addTask(DaemonTaskConfig config) {
DefaultThreadPool.getInstance().execute(DaemonTask.build(config)); DefaultThreadPool.getInstance().execute(DaemonTask.build(config));
} }
} }
...@@ -3,6 +3,7 @@ package com.dlink.daemon.task; ...@@ -3,6 +3,7 @@ package com.dlink.daemon.task;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.daemon.exception.DaemonTaskException; import com.dlink.daemon.exception.DaemonTaskException;
import sun.misc.Service; import sun.misc.Service;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
...@@ -31,8 +32,8 @@ public interface DaemonTask { ...@@ -31,8 +32,8 @@ public interface DaemonTask {
DaemonTask setConfig(DaemonTaskConfig config); DaemonTask setConfig(DaemonTaskConfig config);
default boolean canHandle(String type){ default boolean canHandle(String type) {
return Asserts.isEqualsIgnoreCase(getType(),type); return Asserts.isEqualsIgnoreCase(getType(), type);
} }
String getType(); String getType();
......
...@@ -14,8 +14,8 @@ public class DaemonTaskConfig { ...@@ -14,8 +14,8 @@ public class DaemonTaskConfig {
this.id = id; this.id = id;
} }
public static DaemonTaskConfig build(String type,Integer id){ public static DaemonTaskConfig build(String type, Integer id) {
return new DaemonTaskConfig(type,id); return new DaemonTaskConfig(type, id);
} }
public String getType() { public String getType() {
......
package com.dlink.daemon.task;
import com.dlink.daemon.constant.FlinkTaskConstant;
import com.dlink.model.JobStatus;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
@Data
public class FlinkDaemonTask {
private static final Logger log = LoggerFactory.getLogger(FlinkDaemonTask.class);
public static Random random = new Random(5);
private String id;
private JobStatus status;
private long preDealTime;
private int count;
// @Override
public DaemonTask setConfig(DaemonTaskConfig config) {
return null;
}
// @Override
public String getType() {
return null;
}
// @Override
public void dealTask() {
long gap = 0;
if (this.preDealTime != 0L) {
gap = System.currentTimeMillis() - this.preDealTime;
}
preDealTime = System.currentTimeMillis();
int i = random.nextInt(10);
if(i > 5){
log.info("deal FlinkTask id:" + id + " status: finished count:"+ count + " gap:"+ gap + "ms");
}else {
log.info("deal FlinkTask id:" + id + " status: running count:" +count + " gap:"+ gap + "ms");
//加入等待下次检测
// DefaultThreadPool.getInstance().execute(this);
}
count++;
if(gap < FlinkTaskConstant.TIME_SLEEP){
try {
Thread.sleep(FlinkTaskConstant.TIME_SLEEP);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public FlinkDaemonTask() {
}
public FlinkDaemonTask(String id) {
this.id = id;
}
public FlinkDaemonTask(String id, JobStatus status) {
this.id = id;
this.status = status;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment