使用过的功能〇°
动态生成类
问答GPT
2023/8/1 17:10:25
2023/8/1 17:10:50
CtClass类是Javassist库提供的一个核心类,用于表示已加载的类或待创建的新类。它提供了一系列方法来操作和修改类的结构、属性和方法。
CtClass类的应用场景主要包括以下几个方面:
动态生成类:使用CtClass类可以在运行时动态生成新的类。这对于某些特定的场景非常有用,例如在框架或插件系统中需要动态创建和加载类。通过CtClass,可以添加字段、方法,修改类的继承关系,生成新的字节码,并在运行时加载和使用这些类。
字节码操作和修改:CtClass提供的方法可以直接操作类的字节码。通过CtClass,可以获取和修改类的字段、方法、构造函数、注解等信息。可以新增、删除或修改现有的方法和字段,修改方法体的字节码指令,实现对类的字节码级别的定制化操作。
代码生成和模板扩展:CtClass类可以结合反射和代码生成技术,实现基于模板的代码扩展和自动生成。通过CtClass,可以根据规则和模板生成类的骨架,然后在此基础上生成具体的代码,填充字段、方法、注解等细节,最后动态加载和使用生成的类。
类型分析和元数据提取:CtClass提供了方法来分析类的结构和属性,例如获取类的父类、接口、注解,获取字段和方法的信息,以及访问方法的参数和返回类型等。这些功能对于一些工具和框架来说非常有用,可以基于类的元数据进行各种类型的分析、检查或处理。
总之,CtClass类主要用于在运行时动态生成和修改类的结构和字节码,以实现对类级别的定制化操作、代码生成和扩展。它为开发者提供了更灵活的方式来操作类和字节码,适用于一些特定的场景和需求,如动态代理、AOP框架、ORM工具、代码生成器等。
用例
import javassist.*;
import javassist.bytecode.DuplicateMemberException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/**
* 虚拟实体构建类
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2021年11月23日 16:19:48
* @author 99013
*/
public class Lingdu {
private static final Logger log = LoggerFactory.getLogger(Lingdu.class);
/**
* 虚拟实体类编译路径(初始化路径,是从类池里面获取的)
*/
private final String className = "com.lingdu.dimensiondoor.utils.Ling"+UUID.randomUUID();
/**
* 目标类
*/
private Class<?> clazz;
public Class<?> getClazz(){
return clazz;
}
/**
* 可编辑目标类
*/
private CtClass cc;
/**
* 入参信息<参数名,参数值>
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年03月16日 17:52:00
*/
Map<String, Object> param;
public Map<String, Object> getParam() {
return param;
}
public void setParam(Map<String, Object> param){
this.param = param;
}
public Lingdu(){
}
public Lingdu(List<String> names){
if(names!=null){
param=new HashMap<>((int)(names.size()/0.75+1));
for (String name : names) {
param.put(name,null);
}
}
}
public Lingdu(Map<String, Object> param){
this.param = param;
}
/**
* 虚拟实体封装
* @return 返回封装好的实体集合
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月18日 09:52:23
*/
public Object sviVirtualEntityEncapsulation() {
try {
/*初始化虚拟实体*/
initializesVirtualEntity();
/*封装数据*/
return encapsulateData();
} catch (Exception e) {
log.error("初始化虚拟实体/虚拟实体封装出错!",e);
}
return null;
}
/**
* 初始化虚拟实体
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月18日 09:37:44
*/
public void initializesVirtualEntity() throws Exception {
// 初始化可编辑目标类
getMyClass();
for (String name : param.keySet()) {
try {
changeClass(name);
// 字段重复异常
} catch (DuplicateMemberException e) {
log.error("字段重复!",e);
}
}
/*重新加载修改后的类*/
reload(cc);
}
/**
* 创建属性及get\set方法
* @param fieldName 属性名
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月17日 17:19:52
*/
private void changeClass(String fieldName) throws CannotCompileException, NotFoundException, IOException {
String type = "Object";
//为cTclass对象添加一个属性
cc.addField(CtField.make("private "+type+" " + fieldName + ";", cc));
createMethod("public void set" + fieldName + "("+type+" " + fieldName + "){this." + fieldName + " = " + fieldName + ";}");
createMethod("public "+type+" get" + fieldName + "(){return this." + fieldName + ";}");
}
/**
* 封装数据
* @return 返回封装好的实体集合
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月18日 09:38:11
*/
private Object encapsulateData() throws Exception {
Object o = clazz.newInstance();
for (String s : param.keySet()) {
clazz.getDeclaredMethod("set" + s, Object.class).invoke(o, param.get(s));
}
clazz=null;
/*类重置*/
// resetClass();
return o;
}
/**
* 获取自定义虚拟类
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月17日 16:50:55
*/
private void getMyClass() throws NotFoundException {
// 类池
/*永远获取子级类池,这样类池就不会因为随着方法的调用越来越多*/
ClassPool pool = MyClassLoader.getClassPool();
try {
// 创建虚拟实体类
cc = pool.makeClass(className);
} catch (Exception e) {
// 已经存在了
cc = pool.get(className);
}
cc.defrost();// 解冻
}
/**
* 重新加载修改后的类
* @param cc 修改后的类
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月17日 17:00:00
*/
private void reload(CtClass cc) throws IOException, CannotCompileException {
byte[] bytes = cc.toBytecode();
// 使用自定义的ClassLoader
MyClassLoader cl = new MyClassLoader();
// 加载我们生成的 Ling 类
clazz = cl.defineClass(className, bytes);
}
/*
* 重置类 之前由于虚拟类编译路径相同,考虑到相同实体类字段越用越多的问题所以在创建使用完
* 调用本方法清空所有成员变量以及相应的get/set方法.
* 虚拟类2.0在类编译路径后面加上了UUID唯一标识,就不存在字段越用越多的问题.所以可舍弃本方
* 法清空字段.猜测接口调用过于频繁导致的mybatis底层偶尔获取不到字段信息于此有关,所以该方
* 法被弃用
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月17日 16:29:04
*/
// @Deprecated
// public void resetClass() throws Exception {
// // 初始化可编辑目标类
// getMyClass();
// CtMethod[] methods = cc.getDeclaredMethods();
// for (CtMethod method : methods) {
// /*删除本类所有方法*/
// cc.removeMethod(method);
// }
// CtField[] fields = cc.getDeclaredFields();
// for (CtField field : fields) {
// /*删除本类所有变量*/
// cc.removeField(field);
// }
// /*重新加载修改后的类*/
// reload(cc);
// }
/**
* 单独创建方法
* @param m 具体方法结构
* @return java.lang.Class<?>
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年02月17日 10:50:05
*/
public Class<?> createMethod(String m) throws NotFoundException, CannotCompileException, IOException {
boolean b=false;
if(cc==null){
b=true;
// 初始化可编辑目标类
getMyClass();
}
cc.addMethod(CtMethod.make(m, cc));
if(b){
reload(cc);
return clazz;
}else {
return null;
}
}
}
自定义类加载器
问答GPT
2023/8/2 09:53:01
2023/8/2 09:53:19
自定义类加载器是Java中的一个特性,它允许开发者根据自己的需求定义自己的类加载过程。通过自定义类加载器,我们可以实现一些灵活的类加载逻辑,例如从非标准的位置加载类文件、动态生成类等。
要创建自定义类加载器,需要继承java.lang.ClassLoader
类,并覆盖其中的一些方法。下面是一个简单的示例:
public class CustomClassLoader extends ClassLoader {
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
// 自定义类加载逻辑,根据类名加载对应的字节码,并通过defineClass方法转换为Class对象
byte[] classBytes = loadClassBytes(name);
return defineClass(name, classBytes, 0, classBytes.length);
}
private byte[] loadClassBytes(String className) {
// 加载类字节码的具体实现,可以读取文件、从数据库获取等
// 这里只是简单示例,假设从文件系统加载
String fileName = className.replace('.', '/') + ".class";
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName)) {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
int nextValue;
while ((nextValue = inputStream.read()) != -1) {
byteStream.write(nextValue);
}
return byteStream.toByteArray();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
上述代码中,findClass
方法是自定义类加载器的核心方法,负责根据类名加载对应的字节码并转换为Class对象。loadClassBytes
方法是一个具体的实现,可以根据自己的需求从不同的来源加载类的字节码。
使用自定义类加载器时,可以通过调用loadClass
方法来加载指定名称的类,如下所示:
public class Main {
public static void main(String[] args) throws ClassNotFoundException {
CustomClassLoader classLoader = new CustomClassLoader();
Class<?> clazz = classLoader.loadClass("com.example.MyClass");
// 使用加载的类进行操作
}
}
需要注意的是,在使用自定义类加载器时,要确保类加载器的双亲委派机制符合预期。根据需要,可以选择覆盖其他相关方法以满足具体的要求。
自定义类加载器在某些特定场景下非常有用,例如实现类似热部署、插件化等功能。但是在普通应用中,一般情况下都可以使用默认的系统类加载器(或其子类)来满足需求。
用例
import javassist.ClassPool;
/**
* @author 99013
* @ClassName MyClassLoader
* @Description 自定义类加载器
* 作者:99013
* 创建时间:2022年02月17日 15:36:54
* @Version 1.0
**/
public class MyClassLoader extends ClassLoader {
// 类池
private static ClassPool pool;
/**
* 利用子类池覆盖默认类池,从而防止ClassPool中类越来越多的情况
* @return javassist.ClassPool
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年05月24日 14:33:11
*/
public static ClassPool getClassPool(){
return new ClassPool(getPool());
}
/**
* 初始默认类池
* @return javassist.ClassPool
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2022年05月24日 14:35:01
*/
private static ClassPool getPool(){
if(pool==null){
pool = ClassPool.getDefault();
}
return pool;
}
public Class<?> defineClass(String name, byte[] b) {
// ClassLoader是个抽象类,而ClassLoader.defineClass 方法是protected的
// 所以我们需要定义一个子类将这个方法暴露出来
return super.defineClass(name, b, 0, b.length);
}
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
// 根据类的全类名进行加锁操作,也就是保证了线程安全
synchronized (getClassLoadingLock(name)) {
Class<?> klass = findLoadedClass(name);
// 到已经加载的缓存中查看是否已经被加载了如果是则直接返回,如果没有就需要进行加载
if (klass == null) {
// 如果缓存中没有,则表示这个类是第一次被加载,对于类进行判断操作
if (name.startsWith("java.") || name.startsWith("javax.")) {
try {
klass = getSystemClassLoader().loadClass(name);
} catch (Exception e) {
throw e;
}
// 如果不满足要求则表示使用自定义的类加载器进行加载操作。
} else {
try {
klass = this.findClass(name);
} catch (Exception ignored) {
}
// 如果自定义加载器没有完成则需要交给父类加载器去进行加载操作
if (klass == null) {
if (getParent() != null) {
klass = getParent().loadClass(name);
} else {
klass = getSystemClassLoader().loadClass(name);
}
}
}
}
// 如果加载不成功的话就抛出异常。
if (null == klass) {
throw new ClassNotFoundException("The class " + name + " not found.");
}
if (resolve) {
resolveClass(klass);
}
return klass;
}
}
}
自定义线程池
问答GPT
2023/8/2 11:06:58
2023/8/2 11:07:14
要自定义线程池,可以使用Java的ThreadPoolExecutor
类作为基础,并根据需求进行配置和定制。以下是一个示例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool {
public static void main(String[] args) {
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
1, // 线程空闲时间
TimeUnit.SECONDS, // 空闲时间单位
new LinkedBlockingQueue<>(100) // 阻塞队列
);
// 提交任务给线程池执行
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
// 模拟任务执行耗时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
executor.shutdown();
}
}
在上面的示例中,创建了一个自定义线程池executor
,通过指定核心线程数、最大线程数、线程空闲时间以及阻塞队列的类型和容量等参数进行配置。
然后,可以通过调用execute
方法将任务提交给线程池执行。每个任务是一个简单的匿名函数,打印任务ID和执行线程的名称,并模拟了一个耗时操作。
最后,通过调用shutdown
方法关闭线程池。
自定义线程池时,可以根据具体需求进行配置,例如设置拒绝策略、添加线程池监控等。可以通过ThreadPoolExecutor
类的其他构造函数或方法进行更多的配置和定制。
用例
import java.util.concurrent.*;
/**
* 自定义线程池
* 描述:TODO
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-26 11:01
*/
public class MyThreadPool {
public static ExecutorService threadPool;
// public static ExecutorService getThreadPool(){
// if(threadPool==null){
// newThreadPool();
// }
// return threadPool;
// }
static {
newThreadPool();
}
/**
* 1⚫corePoolSize线程池的核心线程数
* 线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,
* 除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。
* 2⚫maximumPoolSize能容纳的最大线程数
* 一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,
* 如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,
* 然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。
* 线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
* 3⚫keepAliveTime空闲线程存活时间
* 一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,
* 这里的指定时间由keepAliveTime来设定
* 4⚫unit 空闲线程存活时间单位
* 5⚫workQueue 存放提交但未执行任务的队列
* 新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:
* ArrayBlockingQueue
* 基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。
* 当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。
* 如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
* LinkedBlockingQuene
* 基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,
* 当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,
* 因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
* SynchronousQuene
* 一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,
* 而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
* PriorityBlockingQueue
* 具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
* 6⚫threadFactory 创建线程的工厂类
* 创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
* 7⚫handler 等待队列满后的拒绝策略
* 当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,
* 该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
* CallerRunsPolicy
* 该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务
* AbortPolicy
* 该策略下,直接丢弃任务,并抛出RejectedExecutionException异常
* DiscardPolicy
* 该策略下,直接丢弃任务,什么都不做
* DiscardOldestPolicy
* 该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
*/
private static void newThreadPool(){
threadPool = new ThreadPoolExecutor(10,250,
1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(250),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
定时执行任务线程池
动态创建数据源连接
动态切换数据源
问答GPT
2023/8/3 11:42:42
2023/8/3 11:43:12
ScheduledThreadPoolExecutor是Java中的一个线程池实现,它继承自ThreadPoolExecutor,并添加了一些调度功能。它可以在指定时间执行任务,也可以周期性地执行任务。
ScheduledThreadPoolExecutor的主要特点包括:
- 调度执行任务:可以在指定时间或者延迟一定时间后执行任务。可以使用schedule()方法来指定任务的执行时间,也可以使用scheduleAtFixedRate()或scheduleWithFixedDelay()方法来周期性地执行任务。
- 线程池管理:ScheduledThreadPoolExecutor内部使用线程池来管理线程的创建和销毁,可以通过参数来配置线程池的大小、队列容量等属性。
- 异常处理:ScheduledThreadPoolExecutor提供了一些异常处理机制,可以处理任务执行过程中的异常情况,确保线程池的稳定运行。
- 取消任务:可以通过调用ScheduledFuture对象的cancel()方法来取消已经调度的任务。
使用ScheduledThreadPoolExecutor可以方便地实现定时任务和周期性任务的调度执行,非常适用于需要在后台执行一些定时任务的场景,比如定时刷新缓存、定时发送邮件等。
用例
package com.lingdu.common.datasource.utils;
import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.creator.DefaultDataSourceCreator;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DataSourceProperty;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.druid.DruidConfig;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import com.lingdu.common.core.constant.Constants;
import com.lingdu.common.core.exception.ServiceException;
import com.lingdu.common.core.lingdu.MyThreadPool;
import com.lingdu.common.core.lingdu.domain.SysDatabase;
import com.lingdu.common.core.lingdu.utils.JabotUtil;
import com.lingdu.common.core.utils.uuid.IdUtils;
import com.lingdu.common.datasource.domain.DataSourceCategory;
import com.lingdu.common.datasource.enums.RedisDataVersion;
import com.lingdu.common.datasource.enums.Usage;
import com.lingdu.common.redis.service.RedisService;
import com.lingdu.system.api.RemoteDatabaseService;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 描述:动态数据源相关操作
* 项目启动后运行此方法:CommandLineRunner实现
*
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-31 15:48
*/
//@Aspect
@Component
//@Order(0) /* 请注意:这里order一定要小于tx:annotation-driven的order,即先执行DynamicDataSourceAspectAdvice切面,再执行事务切面,才能获取到最终的数据源 */
//@EnableAspectJAutoProxy(proxyTargetClass = true)
public class DynamicDataSourceAspect implements CommandLineRunner {
/**
* 项目启动后立马执行此方法
* 项目启动后运行此方法:CommandLineRunner实现
*/
@Override
public void run(String... var1) {
this.init();
}
private static final Logger log = LoggerFactory.getLogger(DynamicDataSourceAspect.class);
/**
* 数据源工厂
*/
private final DataSource dataSource;
/**
* 数据源创建器
*/
private final DefaultDataSourceCreator dataSourceCreator;
/**
* 数据源服务
*/
private final RemoteDatabaseService remoteDatabaseService;
/**
* redis工具类
*/
private final RedisService redisService;
/**
* 定时执行任务 服务
*/
private final static ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(3,
new BasicThreadFactory.Builder().namingPattern("lingdu-schedule-pool-%d").daemon(true).build());
/**
* 构造方法方式注入依赖
*/
DynamicDataSourceAspect(DataSource dataSource, DefaultDataSourceCreator dataSourceCreator, RemoteDatabaseService remoteDatabaseService, RedisService redisService) {
this.dataSource = dataSource;
this.dataSourceCreator = dataSourceCreator;
this.remoteDatabaseService = remoteDatabaseService;
this.redisService = redisService;
}
/**
* 更新标识
*/
private static String version = "";
/**
* 本地缓存分组数据源信息
*/
public static Map<String, DataSourceCategory> DATE_SOURCE_GROUP = new HashMap<>();
/**
* 本地缓存数据源信息
*/
public static Map<String, SysDatabase> DATE_SOURCE = new HashMap<>();
/**
* 记录线程
* 存在并发修改的可能,所以此处采用线程安全的方式创建对象
*/
public static List<Thread> threadList = Collections.synchronizedList(new ArrayList<>());
/**
* 数据过期时间 (天)
*/
public static Long timeout = 300L;
/**
* 初始化数据源信息
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-23 11:13
*/
private void init() {
updateDataSourceToRedis();
// 等待3秒开始循环调用检测版本方法(间隔1秒执行一次)
SCHEDULER.scheduleWithFixedDelay(this::syncDs, 3, 1, TimeUnit.SECONDS);
}
/**
* 推迟更新
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-31 10:35
*/
public void delayUpdateDataSourceToRedis() {
SCHEDULER.schedule(this::updateDataSourceToRedis, 3, TimeUnit.SECONDS);
}
/**
* 查询数据库中当前环境状态为正常的所有数据源信息与redis中缓存信息作比较
* 不同则更新到redis
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-29 9:21
*/
private void updateDataSourceToRedis() {
// 查询出数据库最新数据源信息(状态为正常的数据)
List<SysDatabase> sysDatabases = remoteDatabaseService.getAvailableDataSource().getData();
// 数据源同步信息
List<SysDatabase> lsd = redisService.getCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getName());
// 如果redis中的信息不一致则更新到redis
if (!sysDatabases.equals(lsd)) {
// redis缓存更新标识
redisService.setCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getVersionName(), IdUtils.fastSimpleUUID(), timeout, TimeUnit.DAYS);
// 更新数据源信息
redisService.setCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getName(), sysDatabases, timeout, TimeUnit.DAYS);
}
}
/**
* 同步数据源
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-23 11:01
*/
private void syncDs() {
/*变动标记值*/
String v = redisService.getCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getVersionName());
// redis缓存数据添加了过期时间,如果未获取到版本信息则重新初始化
if (v == null) {
updateDataSourceToRedis();
}
// 版本号不一致
if (!version.equals(v)) {
// 数据源同步信息
List<SysDatabase> sysDatabases = redisService.getCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getName());
// 分组记录数据源
Map<String, DataSourceCategory> group = new HashMap<>(sysDatabases == null ? 6 : (int) ((sysDatabases.size() / 0.75) + 1));
// 记录所有数据源
Map<String, SysDatabase> msd = new HashMap<>(sysDatabases == null ? 6 : (int) ((sysDatabases.size() / 0.75) + 1));
/*当前已生效的数据源*/
Map<String, DataSource> dataSources = getDataSources();
// 不能直接拿 引用变量在循环后会被还原所以要new
Set<String> dskey = new HashSet<>(dataSources.keySet());
try {
// 将数据源信息转换成map形式
assert sysDatabases != null;
for (SysDatabase sysDatabase : sysDatabases) {
// 转map存储
msd.put(getDsKey(sysDatabase), sysDatabase);
// 数据源分组
classification(sysDatabase, group);
// 数据源差异比较
contrast(sysDatabase, dataSources, dskey);
}
for (String s : dskey) {
// 删除多余的数据源
removeDataSource(s);
}
} catch (Exception e) {
// 防止出现异常导致线程崩溃
e.printStackTrace();
}
// 更新本地缓存
DATE_SOURCE_GROUP = group;
DATE_SOURCE = msd;
// 更新本地标记
version = v;
long l = System.currentTimeMillis();
long l2;
long timeout = 10000;
// 死循环检测线程完成情况
do {
// 清除已执行完的线程标记
threadList.removeIf(thread -> thread.getState().equals(Thread.State.WAITING));
l2 = System.currentTimeMillis();
// 10秒超时时间
if ((l2 - l) > timeout) {
// 清除所有数据 防止循环中未清除干净导致脏数据
threadList = null;
threadList = new ArrayList<>();
break;
}
// 延迟100毫秒左右执行下一轮循环
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} while (!threadList.isEmpty());
}
}
/**
* 数据源分组
*
* @param sysDatabase 数据源信息
* @param mds 分类集合
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-30 15:10
*/
private void classification(SysDatabase sysDatabase, Map<String, DataSourceCategory> mds) {
String usage = sysDatabase.getUsage();
// 数据源分类集
DataSourceCategory dataSourceCategory = mds.get(sysDatabase.getDsKey());
if (dataSourceCategory == null) {
dataSourceCategory = new DataSourceCategory();
}
if (Usage.R.getCode().equals(usage)) {
dataSourceCategory.getReads().add(sysDatabase);
} else if (Usage.W.getCode().equals(usage)) {
dataSourceCategory.getWrites().add(sysDatabase);
} else {
dataSourceCategory.getRw().add(sysDatabase);
}
mds.put(sysDatabase.getDsKey(), dataSourceCategory);
}
/**
* 处理差异
*
* @param sysDatabase 数据源分类集合
* @param dataSources 当前已生效的数据源
* @param oldkey 生效数据源标识集合
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-30 14:45
*/
private void contrast(SysDatabase sysDatabase, Map<String, DataSource> dataSources, Set<String> oldkey) {
String dsKey = getDsKey(sysDatabase);
// 当前已生效的数据源
DataSource dataSource = dataSources.get(dsKey);
// 本地数据源记录
SysDatabase sd = DATE_SOURCE.get(dsKey);
// 未连接数据源
if (dataSource == null) {
// 添加新数据源
addDataSource(sysDatabase);
// 已连接的数据源
} else {
// 删除相同标记的数据项
oldkey.remove(dsKey);
// 本地缓存与redis缓存不一致
if (!sysDatabase.equals(sd)) {
// 覆盖数据源
addDataSource(sysDatabase);
}
}
}
/**
* 使用线程执行
*
* @param d 数据源连接信息
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-30 13:51
*/
private void addDataSource(SysDatabase d) {
MyThreadPool.threadPool.execute(() -> addDataSource(d, false));
}
/**
* 添加数据源
*
* @param d 数据源连接信息
* @param test 是否是测试
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-17 15:27
*/
public boolean addDataSource(SysDatabase d, boolean test) {
// 当前线程
Thread t = Thread.currentThread();
threadList.add(t);
// 数据源唯一标识
String dsKey = getDsKey(d);
String dsKeyTest = test ? IdUtils.simpleUUID() : null;
try {
String url = JabotUtil.jiemi(d.getUrl());
DataSourceProperty property = new DataSourceProperty();
// 设置数据库连接地址
property.setUrl(url);
// 设置登录用户名
property.setUsername(JabotUtil.jiemi(d.getUsername()));
// 设置登录密码
property.setPassword(JabotUtil.jiemi(d.getPsw()));
// 设置数据库连接驱动类名
property.setDriverClassName(d.getDriverClassName());
// 德鲁伊配置
DruidConfig druidConfig = new DruidConfig();
// 失败后重连的次数
druidConfig.setConnectionErrorRetryAttempts(3);
// 请求失败之后中断 默认false
// druidConfig.setBreakAfterAcquireFailure(true);
// 设置验证连接语句
druidConfig.setValidationQuery(d.getValidationQuery());
// keepAlive配置即可使得数据库连接保持活性
druidConfig.setKeepAlive(true);
// 连接超时时间1.2.12新增参数
druidConfig.setConnectTimeout(60000);
// 会话超时时间1.2.12新增参数
druidConfig.setSocketTimeout(60000);
// 设置德鲁伊配置
property.setDruid(druidConfig);
// 开启分布式事务
property.setSeata(true);
DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
DataSource source = dataSourceCreator.createDataSource(property);
Connection connection = source.getConnection();
// 关闭自动提交事务
connection.setAutoCommit(false);
ds.addDataSource(test ? dsKeyTest : dsKey, source);
log.info("连接数据源{}成功", dsKey);
// 连接成功后设置当前运行环境标识
if (Constants.deployedEnvironment == null) {
Constants.deployedEnvironment = d.getDeployedEnvironment();
}
} catch (Exception e) {
log.error(e.getMessage() + " 尝试连接数据库[{}]失败,请检查连接信息", dsKey);
if (test) {
throw new ServiceException("尝试连接数据库[" + dsKey + "]失败,请检查连接信息");
}
return false;
}
if (test) {
removeDataSource(dsKeyTest);
}
return true;
}
/**
* 根据数据源名称断开数据源 线上删除或者禁用数据源时调用
*
* @param dsName 数据源名称
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-17 15:50
*/
public void removeDataSource(String dsName) {
try {
DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
ds.removeDataSource(dsName);
} catch (Exception e) {
log.warn(e.getMessage());
}
}
/**
* 获取所有已加载到连接池的数据源
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-17 18:02
*/
public Map<String, DataSource> getDataSources() {
return ((DynamicRoutingDataSource) dataSource).getDataSources();
}
/**
* 获取数据源唯一标识
*
* @param sysDatabase 数据源信息
* @return 返回唯一标识
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-30 14:41
*/
public String getDsKey(SysDatabase sysDatabase) {
return sysDatabase.getDsKey() + "_" + sysDatabase.getId();
}
/**
* 均衡记录值
*/
private static Map<Integer, Integer> balance;
/**
* 根据数据源用途切换数据源(简单均衡切换)
*
* @param usage 数据源用途枚举类
* @param systemDsn 数据源分组标识
* @param autoCommit 是否自动提交事务 true自动提交 false手动提交
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-30 16:08
*/
public SysDatabase switchingDataSource(Usage usage, String systemDsn, boolean autoCommit) {
// 获取到分组数据源信息
DataSourceCategory dsc = DATE_SOURCE_GROUP.get(systemDsn);
List<SysDatabase> lsd;
// 均衡选取到的数据源信息
SysDatabase sd = null;
if (usage.equals(Usage.R)) {
// 读
lsd = dsc.getReads();
} else {
// 写
lsd = dsc.getWrites();
}
if (!lsd.isEmpty()) {
int i = lsd.hashCode();
if (balance == null) {
balance = new HashMap<>();
}
Integer count = balance.get(i);
count = count == null ? 1 : count > 10000 ? 1 : ++count;
balance.put(i, count);
sd = lsd.get(count % lsd.size());
}
if (sd == null) {
throw new ServiceException("本地缓存未发现数据源" + systemDsn + "信息");
}
String dsKey = getDsKey(sd);
/*当前已生效的数据源*/
Map<String, DataSource> dataSources = getDataSources();
DataSource source = dataSources.get(dsKey);
if (source == null) {
throw new ServiceException("目标数据源[" + systemDsn + "]未找到");
}
try {
Connection connection = source.getConnection();
// 设置是否自动提交事务
connection.setAutoCommit(autoCommit);
} catch (SQLException e) {
throw new RuntimeException(e);
}
// 切换数据源
String push = DynamicDataSourceContextHolder.push(dsKey);
if (!push.equals(dsKey)) {
throw new ServiceException("切换数据源失败");
}
return sd;
}
}
自定义注解
切面
问答GPT
2023/8/2 11:13:52
2023/8/2 11:15:57
自定义注解是一种在Java中用于为程序元素(类、方法、字段等)添加额外元数据的方式。通过自定义注解,可以在编译时或运行时读取和处理这些元数据,从而实现相应的功能。
以下是一个示例,展示如何定义和使用一个自定义注解:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) // 注解保留到运行时
@Target(ElementType.METHOD) // 作用于方法
public @interface CustomAnnotation {
String value() default ""; // 声明一个名为value的成员,默认值为空字符串
int count() default 1; // 声明一个名为count的成员,默认值为1
}
在上述示例中,我们定义了一个CustomAnnotation
的自定义注解。通过@interface
关键字进行声明,并使用了@Retention
和@Target
元注解来指定注解的保留策略和作用目标。
CustomAnnotation
注解定义了两个成员:value
和count
。其中,value
成员使用了默认值为空字符串,count
成员使用了默认值为1。你可以根据需要在自定义注解中定义不同类型的成员,并为其设置默认值。
接下来,我们来看一个使用自定义注解的示例:
public class MyClass {
@CustomAnnotation(value = "Hello", count = 3)
public void myMethod() {
System.out.println("Executing myMethod");
}
在上述示例中,myMethod
方法使用了@CustomAnnotation
注解,并为其中的value
和count
成员提供了具体的值。
在运行时,可以通过反射机制来获取注解信息并进行处理,例如:
import java.lang.reflect.Method;
public class Main {
public static void main(String[] args) throws NoSuchMethodException {
MyClass obj = new MyClass();
Method method = obj.getClass().getMethod("myMethod");
CustomAnnotation annotation = method.getAnnotation(CustomAnnotation.class);
if (annotation != null) {
String value = annotation.value();
int count = annotation.count();
System.out.println("Value: " + value);
System.out.println("Count: " + count);
}
}
}
在上述示例中,我们通过反射获取了myMethod
方法上的CustomAnnotation
注解,并获取了其中的成员值。然后,我们可以根据这些注解信息执行相应的处理逻辑。
自定义注解为我们提供了一种扩展Java语言功能的方式,可以通过在代码中添加自定义注解来传递额外的元数据信息,以便在编译时或运行时进行相应的处理。
2023/8/2 14:12:16
2023/8/2 14:12:39
要使用 @Aspect
注解来处理自定义注解,需要结合使用 AspectJ 切点表达式和通知(advice)来实现。
首先,定义一个自定义注解:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CustomAnnotation {
// 定义注解成员
}
然后,在切面类中使用 @Aspect
注解,并结合切点表达式和通知来处理标记了自定义注解的方法:
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
@Aspect
public class CustomAnnotationAspect {
@Pointcut("@annotation(com.example.CustomAnnotation)")
public void annotatedMethod() {
// 定义切点表达式,匹配标记了自定义注解的方法
}
@After("annotatedMethod()")
public void afterAnnotatedMethod() {
// 在标记了自定义注解的方法执行之后执行的逻辑
}
}
在上述示例中,我们定义了一个切面类 CustomAnnotationAspect
,并使用 @Aspect
注解标记它。在该切面类中,我们使用 @Pointcut
注解定义了一个切点表达式 annotatedMethod()
,通过 "@annotation(com.example.CustomAnnotation)"
来匹配标记了自定义注解的方法。
接下来,我们使用 @After
注解定义了一个增强方法 afterAnnotatedMethod()
,它会在标记了自定义注解的方法执行之后执行。在这个方法中,可以编写相应的逻辑来处理被标记了自定义注解的方法。
最后,在 Spring 配置文件中启用切面自动代理,并将切面类配置为 bean:
<aop:aspectj-autoproxy/>
<bean id="customAnnotationAspect" class="com.example.CustomAnnotationAspect"/>
通过以上配置,切面类 CustomAnnotationAspect
将会拦截并处理标记了 @CustomAnnotation
注解的方法。
需要注意的是,切面类和被切方法所在的类都需要由 Spring 管理,以便切面生效。同时,切点表达式需要根据实际情况进行适当的调整,以匹配到需要处理的方法。
使用 @Aspect
和自定义注解可以方便地实现切面功能,根据注解标记点对方法进行增强处理,从而实现更灵活和可扩展的业务逻辑。
用例
package com.lingdu.common.datasource.annotation;
import com.lingdu.common.datasource.enums.Usage;
import java.lang.annotation.*;
/**
* 数据源切换
*
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-11 9:12
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DSHandover {
/**
* 数据源标识
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-11 9:12
*/
String value();
/**
* 数据源用途
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-11 9:12
*/
Usage usage() default Usage.RW;
/**
* 是否开启事务
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-11 9:39
*/
boolean transactional() default false;
/**
* 是否自动提交事务 true自动提交 false手动提交
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-11 9:13
*/
// boolean autoCommit() default true;
}
切面
package com.lingdu.common.datasource.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import com.lingdu.common.core.context.SecurityContextHolder;
import com.lingdu.common.datasource.annotation.DSHandover;
import com.lingdu.common.datasource.utils.DynamicDataSourceAspect;
import com.lingdu.common.datasource.utils.TransactionUtil;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* 切换数据源处理
*
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-11 9:17
*/
@Aspect
@Order(3)// Order值越小,那么切面越先执行(越后结束).
@Component
public class DsAspect {
private static final Logger log = LoggerFactory.getLogger(DsAspect.class);
private final DynamicDataSourceAspect dynamicDataSourceAspect;
public DsAspect(DynamicDataSourceAspect dynamicDataSourceAspect) {
this.dynamicDataSourceAspect = dynamicDataSourceAspect;
}
/**
* 处理方法前执行
*/
@Before(value = "@annotation(dsHandover)")
public void boBefore(DSHandover dsHandover) {
// 切换数据源
dynamicDataSourceAspect.switchingDataSource(dsHandover.usage(), dsHandover.value(), !dsHandover.transactional());
if (dsHandover.transactional()) {
// 手动创建事务
TransactionUtil.creadeTransaction(dsHandover.value());
}
log.info("切换数据源" + dsHandover.value() + "成功");
}
/**
* 处理完后执行
*/
@AfterReturning(pointcut = "@annotation(dsHandover)")
public void doAfterReturning(DSHandover dsHandover) {
// 最后清理掉此数据源 还原默认数据源
DynamicDataSourceContextHolder.clear();
log.info("还原默认数据源成功");
if (dsHandover.transactional()) {
// 判断是否存在总的事务控制注解
Boolean flag = SecurityContextHolder.get(MultilateralAffairAspect.MULTILATERAL_AFFAIR_FLAG, Boolean.class);
if (flag!=null&&flag){
return;
}
// 事务提交
TransactionUtil.commit();
}
}
/**
* 拦截异常操作
*
* @param e 异常
*/
@AfterThrowing(value = "@annotation(dsHandover)", throwing = "e")
public void doAfterThrowing(DSHandover dsHandover, Exception e) {
log.error(e.getMessage());
if (dsHandover.transactional()) {
// 判断是否存在总的事务控制注解
Boolean flag = SecurityContextHolder.get(MultilateralAffairAspect.MULTILATERAL_AFFAIR_FLAG, Boolean.class);
if (flag!=null&&flag){
return;
}
// 事务回滚
TransactionUtil.rollback();
}
}
}
java模拟HTTP请求
问答GPT
2023/8/3 14:22:54
2023/8/3 14:23:18
在Java中,可以使用多种方式来模拟HTTP请求。以下是两种常见的方法:
- 使用HttpURLConnection类:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class HTTPRequestExample {
public static void main(String[] args) {
try {
// 创建URL对象
URL url = new URL("http://example.com");
// 打开连接
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// 设置请求方法
conn.setRequestMethod("GET");
// 发起请求
int responseCode = conn.getResponseCode();
// 读取响应内容
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
StringBuilder response = new StringBuilder();
while ((line = reader.readLine()) != null) {
response.append(line);
}
reader.close();
// 输出响应结果
System.out.println("Response Code: " + responseCode);
System.out.println("Response Body: " + response.toString());
// 关闭连接
conn.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 使用第三方库如Apache HttpClient: 首先需要在项目中添加Apache HttpClient的依赖,例如使用Maven:
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
然后可以使用以下代码发起HTTP请求:
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class HTTPRequestExample {
public static void main(String[] args) {
try {
// 创建HttpClient对象
CloseableHttpClient httpClient = HttpClients.createDefault();
// 创建HttpGet请求
HttpGet httpGet = new HttpGet("http://example.com");
// 发起请求并获取响应
HttpResponse response = httpClient.execute(httpGet);
// 获取响应内容
HttpEntity entity = response.getEntity();
String responseBody = EntityUtils.toString(entity);
// 输出响应结果
System.out.println("Response Code: " + response.getStatusLine().getStatusCode());
System.out.println("Response Body: " + responseBody);
// 关闭连接
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上代码演示了如何使用Java模拟HTTP请求,并获取服务器的响应结果。具体的请求方式、参数设置等可以根据实际需求进行调整。另外,还可以使用其他第三方库如OkHttp、RestTemplate等来简化HTTP请求的操作。
用例
package com.lingdu.common.core.lingdu.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Map;
/**
* java模拟HTTP请求工具
*
* @author 99013
* @ClassName HttpRequestUtil
* @Description TODO
* 作者:99013
* 创建时间:2020年12月03日 14:07:35
* @Version 2.0
**/
public class HttpRequestUtil {
private static final Logger log = LoggerFactory.getLogger(HttpRequestUtil.class);
/**
* POST请求
*/
public static final String POST = "POST";
/**
* PUT请求
*/
public static final String PUT = "PUT";
/**
* GET请求
*/
public static final String GET = "GET";
/**
* DELETE请求
*/
public static final String DELETE = "DELETE";
/**
* utf-8
*/
public static final String CHARSET_UTF8 = "UTF-8";
/**
* 请求内容类型
*/
public static final String CONTENT_TYPE = "Content-type";
/**
* 请求通用格式
*/
public static final String CONTENT_TYPE_JSON = "application/json";
/**
* multipart/form-data类型
*/
public static final String CONTENT_TYPE_FORMDATA = "multipart/form-data";
public static String CONTENT_TYPE_WAY;
/**
* text/plain类型
*/
public static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
/**
* image/jpeg类型
*/
public static final String MIME_TYPE_IMAGE_JPEG = "image/jpeg";
/**
* 边界
*/
private static final String BOUNDARY = "----WebKitFormBoundary07I8UIuBx6LN2KyY";
/**
* Get请求
*
* @param url 请求路径
* @param params 请求参数
* @param cookie Cookie
* @param headerMap 请求头补充参数
* @return java.lang.String
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2021年08月26日 18:46:43
*/
public static String sendGet(String url, Map<String, Object> params, String cookie, Map<String, Object> headerMap) {
return send(GET, url, params,null, cookie, headerMap, null, null);
}
/**
* Delete请求
*
* @param url 请求路径
* @param params 请求参数
* @param cookie cookie
* @param headerMap 请求头补充参数
* @return java.lang.String
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2021年08月26日 18:48:02
*/
public static String sendDelete(String url, Map<String, Object> params, String cookie, Map<String, Object> headerMap) {
return send(DELETE, url, params,null, cookie, headerMap, null, null);
}
/**
* Post请求
*
* @param url 第三方接口地址
* @param params 请求参数
* @param pjson json方式传参
* @param cookie cookie
* @param headerMap 请求头补充参数
* @return java.lang.String 返回信息
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2020年12月03日 17:45:21
*/
public static String sendPost(String url, Map<String, Object> params,String pjson, String cookie, Map<String, Object> headerMap) {
return sendPostUplodFile(url, params,pjson, cookie, headerMap, null, null);
}
/**
* 模拟http请求上传文件及相关参数
*
* @param url 第三方上传文件地址
* @param fileName 文件名
* @param params 请求参数
* @param bytes 文件字节码
* @param cookie Cookie
* @return java.lang.String
*/
public static String sendPostUplodFile(String url, Map<String, Object> params,String pjson, String cookie, Map<String, Object> headerMap, String fileName, byte[] bytes) {
return send(POST, url, params,pjson, cookie, headerMap, fileName, bytes);
}
/**
* put请求方式
*
* @param url 请求路径
* @param params 请求参数
* @param cookie cookie
* @return java.lang.String
* 创建时间: 2021年03月07日 09:16:34
*/
public static String sendPut(String url, Map<String, Object> params,String pjson, String cookie, Map<String, Object> headerMap) {
return send(PUT, url, params,pjson, cookie, headerMap, null, null);
}
/**
* put请求方式 上传文件
*
* @param url 请求路径
* @param fileName 文件名
* @param params 请求参数
* @param pjson json传参数
* @param bytes 文件字节码
* @param cookie cookie
* @return java.lang.String
* 创建时间: 2021年03月07日 09:16:34
*/
public static String sendPutUplodFile(String url, Map<String, Object> params,String pjson, String cookie, Map<String, Object> headerMap, String fileName, byte[] bytes) {
return send(PUT, url, params,pjson, cookie, headerMap, fileName, bytes);
}
/**
* 执行请求
*
* @param type 请求类型
* @param url 第三方请求地址
* @param params 请求参数
* @param pjson json参数
* @param cookie Cookie
* @param headerMap 补充请求头参数
* @param fileName 文件名
* @param bytes 文件字节码
* @return java.lang.String
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2021年08月26日 18:21:34
*/
public static String send(String type, String url, Map<String, Object> params,String pjson, String cookie, Map<String, Object> headerMap, String fileName, byte[] bytes) {
DataOutputStream out = null;
BufferedReader in = null;
StringBuilder result = new StringBuilder();
try {
// GET 和 DELETE 请求
if (type.equals(GET) || type.equals(DELETE)) {
if (params != null) {
StringBuilder urlBuilder = new StringBuilder(url + "?");
for (String s : params.keySet()) {
// 防 止 中 文 乱 码 ↓
urlBuilder.append(s).append("=").append(URLEncoder.encode(params.get(s).toString(), CHARSET_UTF8)).append("&");
}
url = urlBuilder.toString();
}
}
// 默认表单提交
CONTENT_TYPE_WAY=CONTENT_TYPE_FORMDATA;
// 如果pjson不为空则使用json方式提交
if(pjson!=null){
CONTENT_TYPE_WAY=CONTENT_TYPE_JSON;
}
//打开和URL之间的连接
HttpURLConnection conn = getHttpUrlConnection(url, type, cookie, headerMap);
// POST 和 PUT 请求
if (type.equals(POST) || type.equals(PUT)) {
out = new DataOutputStream(conn.getOutputStream());
//添加参数
if (params != null) {
for (String key : params.keySet()) {
String sb = "--" +
BOUNDARY +
"\r\n" +
"Content-Disposition: form-data;name=\"" + key + "\"" +
"\r\n" +
"\r\n" +
params.get(key) +
// URLEncoder.encode(params.get(key).toString(), "utf-8")+
"\r\n";
out.write(sb.getBytes());
}
}
// 传文件
if (null != fileName && null != bytes) {
//添加参数file
String sb = "--" +
BOUNDARY +
"\r\n" +
"Content-Disposition: form-data;name=\"file\";filename=\"" + fileName + "\"" +
"\r\n" +
"Content-Type: " + MIME_TYPE_IMAGE_JPEG +
"\r\n" +
"\r\n";
out.write(sb.getBytes());
// TODO 写文件
out.write(bytes);
out.write("\r\n".getBytes());
}
if(pjson!=null){
out.write(pjson.getBytes());
}else {
byte[] endData = ("\r\n--" + BOUNDARY + "--\r\n").getBytes();
out.write(endData);
}
//flush输出流的缓冲
out.flush();
}
//定义BufferedReader输入流来读取URL的响应
in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
result.append(line);
}
} catch (Exception e) {
log.error("发送请求出现异常:", e);
e.printStackTrace();
} finally {
try {
if (out != null) {
out.close();
}
if (in != null) {
in.close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
return result.toString();
}
/**
* 和URL之间建立连接
*
* @param url 请求路径
* @param type 请求类型
* @param cookie cookie
* @param headerMap 请求头补充参数
* @return java.net.HttpURLConnection 返回连接信息
* <br><br><b>作者: 990130556 <a class=b href="https://blog.csdn.net/lingdu_dou">lingdu</a></b><br>
* 创建时间: 2021年09月02日 15:51:28
*/
private static HttpURLConnection getHttpUrlConnection(String url, String type, String cookie, Map<String, Object> headerMap) throws IOException {
URL realUrl = new URL(url);
//打开和URL之间的连接
HttpURLConnection conn = (HttpURLConnection) realUrl.openConnection();
if (POST.equals(type) || PUT.equals(type)) {
//发送POST请求必须设置如下两行
// 默认值为:false,当向远程服务器传送数据/写数据时,需要设置为true
conn.setDoOutput(true);
// 默认值为:true,当前向远程服务读取数据时,设置为true,该参数可有可无
conn.setDoInput(true);
}
conn.setUseCaches(false);
conn.setRequestMethod(type);
// 设置连接主机服务器超时时间:15000毫秒
conn.setConnectTimeout(15000);
// 设置读取主机服务器返回数据超时时间:60000毫秒
conn.setReadTimeout(60000);
conn.setRequestProperty("connection", "Keep-Alive");
conn.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36");
conn.setRequestProperty("Charsert", CHARSET_UTF8);
if (cookie != null) {
conn.setRequestProperty("Cookie", cookie);
}
conn.setRequestProperty(CONTENT_TYPE, CONTENT_TYPE_WAY + "; boundary=" + BOUNDARY);
//填充请求头
if (headerMap != null && headerMap.keySet().size() != 0) {
for (String s : headerMap.keySet()) {
conn.setRequestProperty(s, headerMap.get(s).toString());
}
}
conn.connect();
return conn;
}
}
webservice
问答GPT
2023/8/3 14:36:25
2023/8/3 14:36:44
Web服务(Web Service)是一种通过网络进行通信和交互的软件系统,它通过标准的HTTP(例如使用SOAP协议)或者其他传输协议来实现不同平台上的应用程序之间的互操作性。
我们可以将Web服务看作是一个提供某种功能或服务的应用程序,其他应用程序可以通过网络调用该服务并获取结果。Web服务通常基于以下几个核心技术:
SOAP(Simple Object Access Protocol):一种基于XML的通信协议,用于在网络上交换结构化的信息。SOAP消息通常使用HTTP作为传输协议,可以传递复杂的数据结构和执行远程过程调用(RPC)。
WSDL(Web Services Description Language):一种用于描述Web服务的接口和交互方式的XML格式规范。WSDL文件定义了Web服务的方法、参数、返回值以及如何访问和使用该服务。
UDDI(Universal Description, Discovery, and Integration):一种用于发布、发现和管理Web服务的目录服务。UDDI允许开发人员注册和搜索Web服务,以便找到满足特定需求的服务。
使用Web服务时,客户端应用程序通常会通过WSDL文件了解服务的接口和使用方式,然后根据WSDL生成客户端代码进行调用。客户端和服务端之间的通信遵循一定的协议(例如SOAP),通过请求和响应消息进行交互。
Web服务的优点包括跨平台、松耦合、标准化和可扩展性等。它可以使不同平台和技术栈的应用程序相互交互,并能够实现分布式系统的构建和集成。常见的Web服务框架有Apache CXF、Axis、JAX-WS等,它们提供了对Web服务的开发和部署的支持。
2023/8/3 14:41:07
2023/8/3 14:41:29
Apache CXF(Apache Common XML Framework)是一个开源的、全功能的Web服务框架,提供了用于构建和部署Web服务的一系列工具和库。它是Apache Software Foundation的一个顶级项目,提供了Java语言的实现。
Apache CXF支持多种Web服务标准和协议,包括SOAP、RESTful、WS-*等。它可以用于构建客户端和服务端的Web服务应用程序,同时也提供了对各种传输协议(如HTTP、JMS、SMTP等)的支持。
以下是一些Apache CXF的特性和功能:
支持各种Web服务标准和规范:Apache CXF符合SOAP、WSDL、WS-*等标准,可以创建基于这些标准的Web服务应用程序。
JAX-WS和JAX-RS支持:Apache CXF兼容Java的JAX-WS和JAX-RS标准,可以通过使用注解或配置文件来定义Web服务接口和实现。
基于Spring框架的集成:Apache CXF可以与Spring框架无缝集成,通过Spring配置文件管理和配置Web服务。
数据绑定和转换:Apache CXF提供了灵活的数据绑定和转换功能,可以将Java对象映射为XML或JSON格式,并处理请求和响应之间的转换。
安全性支持:Apache CXF提供了可扩展的安全性支持,包括认证、授权、加密、数字签名等功能,保护Web服务的安全性。
扩展性和可插拔性:Apache CXF的设计具有高度的扩展性和可插拔性,可以通过添加插件和扩展来增强其功能。
支持多种传输协议:除了HTTP传输协议,Apache CXF还支持JMS、SMTP等多种协议,可以根据需要选择适合的传输方式。
总的来说,Apache CXF是一个强大而灵活的Web服务框架,适用于构建各种类型的Web服务应用程序。它具有丰富的功能和良好的可扩展性,为开发人员提供了方便和便捷的工具和库。
2023/8/3 14:42:03
2023/8/3 14:42:25
wsdl2java是Apache CXF框架提供的一个命令行工具,用于根据WSDL文件生成Java客户端代码。
使用wsdl2java可以将WSDL文件中定义的Web服务接口和数据类型转换为Java类,并生成用于访问和调用Web服务的客户端代码。生成的客户端代码包括用于发送请求和处理响应的方法、参数和返回值的Java类。
以下是使用wsdl2java的基本步骤:
下载和安装Apache CXF框架:首先需要下载并安装Apache CXF框架,可以从官方网站(https://cxf.apache.org/)获取最新版本的安装包。
执行wsdl2java命令:打开命令行终端,进入到安装目录的bin文件夹下,执行以下命令:
wsdl2java -d <输出路径> -p <包名> <WSDL文件路径>
其中,
<输出路径>
是生成的Java代码存放的目录,<包名>
是生成的Java类所在的包名,<WSDL文件路径>
是WSDL文件的路径。生成Java客户端代码:执行上述命令后,wsdl2java将会解析WSDL文件并生成相应的Java客户端代码。生成的代码包括接口、数据类型和辅助类等。
使用生成的Java客户端:将生成的Java代码导入到项目中,并在业务代码中使用生成的类调用Web服务。可以使用生成的接口来发送请求和处理响应,与Web服务进行交互。
需要注意的是,在执行wsdl2java命令之前,确保已经安装了Java Development Kit(JDK)并添加到系统的环境变量中。
除了基本的使用方式外,wsdl2java还提供了一些选项和参数,可以根据需要进行配置。例如,可以通过-p
选项指定生成的Java类所在的包名,通过-impl
选项生成服务端的代码等。
总之,wsdl2java是一个方便的工具,可以快速生成Java客户端代码,以便与Web服务进行交互和集成。它使开发人员能够更加高效地开发和使用Web服务。
用例
websocket
问答GPT
2023/8/3 14:57:12
2023/8/3 14:57:38
WebSocket是一种在Web浏览器和服务器之间实现全双工通信的协议。相比传统的HTTP请求响应模式,WebSocket允许建立持久连接,并在客户端和服务器之间进行实时的双向通信。
以下是WebSocket的一些特点和使用方式:
长连接:WebSocket使用长连接而不是短连接,通过在客户端和服务器之间建立持久连接,可以实现实时的双向通信。这种长连接的特性使得服务器能够主动向客户端发送数据,而不需要客户端发起请求。
实时性:由于WebSocket是全双工通信,服务器可以及时将数据推送给客户端,在实时性要求较高的应用中非常有用,如聊天室、实时更新的股票报价等。
简化的API:WebSocket提供了一套简化的API,使得开发者可以轻松地实现WebSocket通信。在JavaScript中,可以使用WebSocket API来建立连接、发送和接收消息,而在服务器端,也有一些成熟的库和框架可以用来处理WebSocket请求。
跨域支持:WebSocket支持跨域通信,即客户端和服务器可以位于不同的域名下,只要服务器允许。这使得开发人员可以更自由地构建分布式系统和跨平台应用。
使用WebSocket的基本步骤如下:
建立连接:在客户端,通过创建一个WebSocket对象,指定要连接的服务器地址和协议(通常是"ws://"或"wss://")。客户端会向服务器发送连接请求,服务器收到请求后可以接受或拒绝连接。
发送和接收消息:一旦连接建立成功,客户端和服务器可以相互发送消息。客户端可以使用WebSocket对象的send()方法发送消息,而服务器可以在收到消息后,通过WebSocket对象的onMessage事件处理程序进行处理。
关闭连接:当通信完成或不再需要时,客户端或服务器可以关闭连接。可以调用WebSocket对象的close()方法来关闭连接,并可以选择指定关闭的状态码和原因。
需要注意的是,WebSocket需要服务器端的支持。服务器需要实现WebSocket协议,接收和处理WebSocket的连接和消息。许多主流的Web开发框架都提供了对WebSocket的支持,可以方便地创建WebSocket服务器。
总之,WebSocket是一种强大的通信协议,使得Web应用可以实现实时的双向通信。它在实时性要求较高的应用场景中非常有用,为开发者提供了更多的灵活性和交互性。
用例
websocket启动类
package com.lingdu.kafka.utils.lingdu.websocket;
import com.lingdu.common.core.lingdu.MyThreadPool;
import com.lingdu.common.core.utils.SpringUtils;
import com.lingdu.kafka.domain.KafkaConfig;
import com.lingdu.kafka.service.IKafkaConfigService;
import com.lingdu.kafka.utils.lingdu.KafkaUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* websocket启动类
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-02-27 15:19
*/
@Component
@ServerEndpoint(value = "/push/websocket",subprotocols = {"protocol"})
@Slf4j
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static final CopyOnWriteArraySet<WebSocketServer> WEB_SOCKET_SERVERS = new CopyOnWriteArraySet<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收sid
*/
private String sid = "";
Map<String, String> params = new HashMap<>();
public WebSocketServer() {
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
//加入set中
WEB_SOCKET_SERVERS.add(this);
//在线数加1
addOnlineCount();
log.info("有新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());
this.sid = sid;
String queryString = session.getQueryString();
log.info(queryString);
String[] array = queryString.split("&");
for (String p : array) {
String[] split = p.split("=");
params.put(split[0], split[1]);
}
Integer kafkaId = Optional.ofNullable(params.get("kafkaId")).map(Integer::parseInt)
.orElseThrow(() -> new RuntimeException("缺乏参数 sourceId 无法建立链接"));
KafkaConfig kafkaConfig = SpringUtils.getBean(IKafkaConfigService.class).selectKafkaConfigById(Long.parseLong(kafkaId+""));
// 使用线程池运行
MyThreadPool.threadPool.execute(()->consume(this.session, kafkaConfig, params.get("topic"), params.get("group"), params.get("offset")));
}
public void consume(Session session, KafkaConfig kafkaConfig, String topic, String group, String offset) {
KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(kafkaConfig, topic, group, offset);
while (session.isOpen()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
session.getBasicRemote().sendText(record.value());
// 手动提交
consumer.commitSync();
} catch (IOException e) {
e.printStackTrace();
}
}
}
consumer.close();
log.info("kafka consumer closed");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
//从set中删除
WEB_SOCKET_SERVERS.remove(this);
//在线数减1
subOnlineCount();
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
if ("heart".equals(message)) {
try {
sendMessage("heartOk");
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @param session 当前会话
* @param error 异常信息
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
*/
public static void sendInfo(String message) throws IOException {
for (WebSocketServer item : WEB_SOCKET_SERVERS) {
try {
//这里可以设定只推送给这个sid的,为null则全部推送
// if(sid==null) {
item.sendMessage(message);
log.info("推送消息到窗口" + item.sid + ",推送内容:" + message);
// }else if(item.sid.equals(sid)){
// item.sendMessage(message);
// }
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WebSocketServer that = (WebSocketServer) o;
if (!Objects.equals(session, that.session)) {
return false;
}
if (!Objects.equals(sid, that.sid)) {
return false;
}
return Objects.equals(params, that.params);
}
@Override
public int hashCode() {
int result = session != null ? session.hashCode() : 0;
result = 31 * result + (sid != null ? sid.hashCode() : 0);
result = 31 * result + (params != null ? params.hashCode() : 0);
return result;
}
}
开启websocket支持
package com.lingdu.kafka.utils.lingdu.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 开启websocket支持
* 描述:TODO
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-02-23 16:01
*/
@Configuration
public class WebSocketConfig {
/**
* 扫描并注册带有@ServerEndpoint注解的所有服务端
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-02-23 16:01 */
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
SSE长链接
问答GPT
2023/8/3 15:12:56
2023/8/3 15:13:21
SSE(Server-Sent Events)是一种基于HTTP的长连接技术,用于服务器向客户端实时推送数据。与WebSocket不同,SSE使用了标准的HTTP协议,而不需要额外的协议升级。
以下是SSE的一些特点和使用方式:
单向通信:SSE是一种单向通信机制,即只有服务器可以向客户端发送数据。客户端通过与服务器建立持久的HTTP连接,在连接保持打开的情况下,服务器可以随时向客户端发送数据。
实时性:SSE适用于需要实时更新数据的场景。服务器可以主动向客户端推送事件和数据,客户端通过监听这些事件来接收数据。这使得SSE非常适合实时通知、实时监控以及即时消息等应用。
简单易用:SSE使用简单的文本数据格式,以事件流(Event Stream)的形式传输数据。服务器将数据打包为一系列的事件,每个事件由一个固定的格式组成,客户端通过解析这些事件来获取数据。
支持重连:SSE支持自动重连机制,当连接中断或出现错误时,客户端可以自动尝试重新建立连接。这样可以提高连接的可靠性和稳定性。
使用SSE的基本步骤如下:
建立连接:在客户端,通过简单的HTML标签或JavaScript API创建一个EventSource对象,并指定要连接的服务器端点。
接收数据:一旦连接建立成功,客户端可以通过addEventListener()方法来监听事件。服务器发送的数据将被封装为事件,客户端可以通过事件处理程序来处理接收到的数据。
关闭连接:当不再需要接收数据或发生错误时,可以调用EventSource对象的close()方法来关闭连接。
在服务器端,需要实现对SSE请求的处理。具体实现方式取决于所使用的开发语言和框架。一般来说,服务器需要设置响应头的Content-Type为"text/event-stream",并按照SSE的规范来格式化并发送事件流给客户端。
需要注意的是,SSE仅适用于服务器向客户端的单向通信,如果需要双向通信或更复杂的交互,可以考虑使用WebSocket等其他技术。
总之,SSE是一种基于HTTP的长连接技术,用于服务器向客户端实时推送数据。它具有实时性、简单易用和支持重连等特点,适用于实时通知和数据更新的场景。
用例
支持序列化
package com.lingdu.kafka.utils.lingdu.sse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.Serializable;
/**
*
* 描述:为了支持序列化
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-21 14:07
*/
public class SerializableSSE extends SseEmitter implements Serializable {
public SerializableSSE() {
}
public SerializableSSE(Long timeout) {
super(timeout);
}
}
返回类
package com.lingdu.kafka.utils.lingdu.sse;
/**
*
* 描述:
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-28 9:57
*/
public class SseEmitterResultVO {
private Object msg;
/**
* 消息类型
*/
private String type;
/**
* 显示方式
*/
private String displayMode;
/**
* 毫秒级时间戳
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-09-06 10:17 */
private long now;
public Object getMsg() {
return msg;
}
public void setMsg(Object msg) {
this.msg = msg;
}
public long getNow() {
return now;
}
public void setNow(long now) {
this.now = now;
}
}
业务层接口
package com.lingdu.kafka.utils.lingdu.sse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
public interface SseEmitterService {
SseEmitter createSseConnect(String clientId);
void closeSseConnect(String clientId);
/**
* 根据客户端id获取SseEmitter对象
* @param clientId
* @return
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-18 10:34 */
SseEmitter getSseEmitterByClientId(String clientId);
/**
* 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
* @param sseEmitterResultVOList
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-18 10:34 */
void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList);
}
业务层实现类
package com.lingdu.kafka.utils.lingdu.sse;
import com.lingdu.common.core.exception.ServiceException;
import com.lingdu.common.core.lingdu.MyThreadPool;
import com.lingdu.common.core.utils.DateUtils;
import com.lingdu.common.core.utils.SpringUtils;
import com.lingdu.common.core.utils.StringUtils;
import com.lingdu.common.core.utils.uuid.IdUtils;
import com.lingdu.common.redis.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
//import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Service
public class SseEmitterServiceImpl implements SseEmitterService {
private static final Logger log = LoggerFactory.getLogger(SseEmitterServiceImpl.class);
private static final String CLIENT_ID = "CLIENT_ID";
private static final String TASK_RESULT = "TASK_RESULT";
/**
* 容器,保存连接,用于输出返回
*/
private static Map<String, SerializableSSE> sseCache = new ConcurrentHashMap<>();
/**
* redis工具类
*/
private RedisService redisService = SpringUtils.getBean(RedisService.class);
private static final String SSE_KEY = "SSE_KEY";
/*
测试
*/
static {
SseEmitterServiceImpl bean = SpringUtils.getBean(SseEmitterServiceImpl.class);
MyThreadPool.threadPool.execute(() -> {
while (true) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
SseEmitterResultVO sseEmitterResultVO = new SseEmitterResultVO();
sseEmitterResultVO.setMsg(DateUtils.getTime());
sseEmitterResultVO.setNow(System.currentTimeMillis());
List<SseEmitterResultVO> list = new ArrayList<>();
list.add(sseEmitterResultVO);
bean.sendMsgToClient(list);
}
});
}
public void a(){
}
@Override
public SerializableSSE createSseConnect(String clientId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
// 一天的毫秒值
SerializableSSE sseEmitter = new SerializableSSE(86400000L);
// 是否需要给客户端推送ID
if (StringUtils.isBlank(clientId)) {
clientId = IdUtils.simpleUUID();
}
// 注册回调
sseEmitter.onCompletion(completionCallBack(clientId));
sseCache.put(clientId, sseEmitter);
log.info("创建新的sse连接,当前用户:{}", clientId);
redisService.setCacheObject(SSE_KEY,sseCache);
try {
sseEmitter.send(SerializableSSE.event().id(CLIENT_ID).data(clientId));
} catch (Exception e) {
e.printStackTrace();
log.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
throw new ServiceException("创建连接异常!");
}
return sseEmitter;
}
@Override
public void closeSseConnect(String clientId) {
SerializableSSE sseEmitter = sseCache.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
removeUser(clientId);
}
}
// 根据客户端id获取SseEmitter对象
@Override
public SerializableSSE getSseEmitterByClientId(String clientId) {
return sseCache.get(clientId);
}
// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
@Override
public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {
// if (CollectionUtil.isEmpty(sseCache)) {
if (sseCache.isEmpty()) {
return;
}
for (Map.Entry<String, SerializableSSE> entry : sseCache.entrySet()) {
sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());
}
}
/**
* 推送消息到客户端
* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
*
* @param clientId 客户端ID
* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可
**/
private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SerializableSSE sseEmitter) {
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
clientId, sseEmitterResultVOList.toString());
return;
}
SerializableSSE.SseEventBuilder sendData = SerializableSSE.event().id(TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// // 推送消息失败,记录错误日志,进行重推
// log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList, e);5555
// boolean isSuccess = true;
// // 推送消息失败后,每隔10s推送一次,推送5次
// for (int i = 0; i < 5; i++) {
// try {
// Thread.sleep(10000);
// sseEmitter = sseCache.get(clientId);
// if (sseEmitter == null) {
// log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
// continue;
// }
// sseEmitter.send(sendData);
// } catch (Exception ex) {
// log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
// continue;
// }
// log.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList);
// return;
// }
// 多线程方式处理异常
MyThreadPool.threadPool.execute(() -> {
errorRetry(clientId, sendData, sseEmitterResultVOList, e);
});
}
}
/**
* 长链接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端ID
* @return java.lang.Runnable
**/
private Runnable completionCallBack(String clientId) {
return () -> {
log.info("结束连接:{}", clientId);
removeUser(clientId);
};
}
/**
* 连接超时时调用
*
* @param clientId 客户端ID
* @return java.lang.Runnable
**/
private Runnable timeoutCallBack(String clientId) {
return () -> {
log.info("连接超时:{}", clientId);
removeUser(clientId);
};
}
/**
* 出错重试
*
* @param clientId 客户端id
* @param sendData 消息内容
* @param sseEmitterResultVOList 消息集合
* @param e 异常信息
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-24 14:07
*/
private void errorRetry(String clientId, SerializableSSE.SseEventBuilder sendData,
List<SseEmitterResultVO> sseEmitterResultVOList, IOException e) {
MyThreadPool.threadPool.execute(() -> {
// 推送消息失败,记录错误日志,进行重推
log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList, e);
boolean isSuccess = false;
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
SerializableSSE sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
sseEmitter.send(sendData);
} catch (Exception ex) {
log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
isSuccess = true;
log.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList);
break;
}
if (!isSuccess) {
removeUser(clientId);
}
});
}
/**
* 推送消息异常时,回调方法
*
* @param clientId 客户端ID
* @return java.util.function.Consumer<java.lang.Throwable>
**/
private Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
SerializableSSE sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
continue;
}
sseEmitter.send("失败后重新推送");
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
/**
* 移除用户连接
*
* @param clientId 客户端ID
**/
private void removeUser(String clientId) {
sseCache.remove(clientId);
log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
}
}
控制器层
package com.lingdu.kafka.utils.lingdu.sse;
import com.lingdu.common.core.web.domain.AjaxResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.WebAsyncTask;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
/**
* 系统消息推送控制器
* 描述:TODO
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-04-27 17:18
*/
@RestController
@RequestMapping(value = "/sse")
public class SSEController {
@Autowired
private SseEmitterService sseEmitterService;
/**
* 创建SSE长链接
*
* @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
**/
@CrossOrigin //如果nginx做了跨域处理,此处可去掉
@GetMapping("/CreateSseConnect")
public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
return sseEmitterService.createSseConnect(clientId);
}
/**
* 关闭SSE连接
*
* @param clientId 客户端ID
**/
@GetMapping("/CloseSseConnect")
public AjaxResult closeSseConnect(String clientId) {
sseEmitterService. closeSseConnect(clientId);
return AjaxResult.success();
}
}
Vue
<template>
<div style="padding-top: 100px">
<el-form ref="elForm" :model="formData" :rules="rules" size="medium" label-width="100px">
<el-row :gutter="15">
<el-col :span="22">
<el-row type="flex" justify="space-between" align="top" :gutter="15">
<el-col :span="22">
<el-form-item label="消息" prop="msg">
<el-input v-model="formData.msg" type="textarea" placeholder=""
:autosize="{minRows: 10, maxRows: 10}" :style="{width: '90%'}">
</el-input>
<el-link :underline="false" icon="el-icon-document-copy" v-clipboard:copy="formData.msg"
v-clipboard:success="clipboardSuccess" style="float:right; width:10%">复制
</el-link>
</el-form-item>
</el-col>
</el-row>
</el-col>
</el-row>
<div id="message"></div>
</el-form>
</div>
</template>
<script>
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
let es = null;
let time = new Date().getTime();
let outNumber = 0;
import {EventSourcePolyfill} from 'event-source-polyfill';
import {getToken} from '@/utils/auth'
export default {
data() {
return {
formData: {
msg: null,
},
rules: {},
}
},
mounted () {
// 页面加载完毕执行
this.$nextTick(function () {
//调用需要执行的方法
this.connect();
})
document.addEventListener('visibilitychange', this.handleVisiable)
document.body.onblur = () => {outNumber++
console.log(outNumber)
// alert('请回到考试页面,您已离开${this.outNumber}次')
}
},
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window:onbeforeunload = function () {
this.closeSse();
},
// mounted() {
// document.addEventListener('visibilitychange', this.handleVisiable)
// },
destroyed() {
document.removeEventListener('visibilitychange', this.handleVisiable)
},
methods: {
connect() {
if (!!window.EventSource) {
console.log(time)
// let url = 'http://192.168.0.230/prod-api/kafka/sse/subscribe?appId=' + time
let url = 'http://192.168.0.230/prod-api/kafka/sse/CreateSseConnect?clientId=' + time
// es = new EventSource(url);
es = new EventSourcePolyfill(url, {
headers: {
'Authorization': 'Bearer ' + getToken(),
// 'X-Custom-Header': 'value'
}
});
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
es.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
// es.addEventListener('message', function (e) {
// setMessageInnerHTML(e.data);
// });
es.addEventListener('message', event => {
console.log(event.data);
this.formData.msg = event.data;
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
// es.addEventListener('error', function (e) {
// if (e.readyState === EventSource.CLOSED) {
// setMessageInnerHTML("连接关闭");
// } else {
// console.log(e);
// }
// }, false);
es.addEventListener('error', event => {
if (event.readyState === EventSource.CLOSED) {
console.log('event was closed');
this.formData.msg = "连接关闭";
}
});
es.addEventListener('close', event => {
console.log(event.type);
es.close();
});
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
},
// 关闭Sse连接
closeSse() {
es.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://192.168.0.230/prod-api/kafka/sse/CloseConnect/?clientId='+time, true);
httpRequest.send();
console.log("close");
},
/** 复制成功 */
clipboardSuccess() {
this.$modal.msgSuccess("复制成功");
},
handleVisiable(e) {
switch(e.target.visibilityState) {
case 'hidden':
console.log('内容不可见,处理后台、最小化、锁屏状态')
alert('警告!你已经离开当前答题页面');
break;
case 'visible':
console.log('处于正常打开')
break;
}
},
}
}
</script>
<style>
</style>
枚举类
集群信息同步
问答GPT
2023/8/3 16:21:45
2023/8/3 16:22:06
枚举类是一种特殊的数据类型,用于定义一组具名的常量。在很多编程语言中,枚举类提供了一种更优雅、可读性更高的方式来表示固定数量的取值。每个枚举常量都有一个唯一的名称,并且可以指定它们的值。
以下是一些关于枚举类的常见特点和用法:
- 定义枚举类:可以使用关键字 enum 来定义一个枚举类。在枚举类中,列出所有的枚举常量,并用逗号分隔。例如,在Java中定义一个表示星期的枚举类可以是:
enum Weekday {
MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY
}
枚举常量的访问:可以通过枚举类名和枚举常量名来访问枚举常量。例如,在上面的示例中,可以使用 Weekday.MONDAY 来引用星期一的枚举常量。
枚举常量的属性和方法:枚举常量可以具有属性和方法。可以为每个枚举常量添加需要的属性,也可以为整个枚举类定义方法。例如,在上面的示例中,可以为 Weekday 枚举常量添加一个属性表示星期几的序号。
值的比较:可以使用相等运算符(==)来比较枚举常量之间的值。由于枚举常量是唯一的,因此可以使用相等运算符进行简单的相等性比较。
枚举类的优点包括提供了更好的可读性、类型安全性和代码的维护性。使用枚举类可以清晰地表示一组相关的常量,并且编译器可以在编译时检查错误。
不同的编程语言对枚举类的实现方式可能略有不同,但基本概念和用法相似。在实际的开发中,可以根据具体需要选择合适的枚举类使用。
2023/8/3 15:38:14
2023/8/3 15:38:37
在微服务架构中,由于每个微服务都是独立部署和运行的,可能存在多个实例同时运行的情况。因此,需要确保微服务集群之间的信息同步,以保持数据的一致性和可靠性。
以下是一些常见的方法和技术用于微服务集群信息同步:
发布-订阅模式:使用消息队列或事件总线作为中介,微服务发布数据变更的事件,其他订阅了相应事件的微服务接收并处理。这种模式能够实现解耦和异步通信,每个微服务只关心自己需要的事件,而不需要直接依赖其他微服务。
数据复制和同步:将关键数据存储在共享的数据库或存储系统中,并定期将数据复制到其他微服务所在的数据库中。可以使用数据同步工具或技术来实现,如数据库复制、数据同步服务等。
RESTful API调用:微服务可以通过RESTful API来调用其他微服务提供的接口,实现数据的查询和更新。当一个微服务更新了数据后,可以通过调用相关微服务的API来通知和更新其他微服务的数据。
基于事件源的架构(Event Sourcing):将所有的事件和状态变更以事件流的方式保存,并使用事件日志作为系统的可信源。每个微服务可以订阅感兴趣的事件,并根据事件进行状态更新和处理。
无论采用哪种方法,关键是确保数据同步的准确性和及时性。在实施微服务集群信息同步时,还要考虑以下因素:
- 可靠性:确保在网络中断或其他故障情况下,信息同步依然可靠进行。
- 一致性:保证不同微服务之间的数据一致性,避免脏数据或冲突。
- 性能:注意信息同步过程的性能开销,避免对整体系统性能造成负面影响。
- 安全性:保护敏感数据的安全,并采取必要的安全措施,如身份验证、授权等。
需要根据具体的业务需求和系统特点选择适合的信息同步方法和技术,以满足系统的实时性、可靠性和稳定性要求。
用例
在微服务架构中,由于每个微服务都是独立部署和运行的,可能存在多个实例同时运行的情况。因此,需要确保微服务集群之间的信息同步,以保持数据的一致性和可靠性。
比如 对于动态数据源的增删改操作要在每个实例中同时生效。我之前的做法是在增删改的操作中同步信息到redis
,在数据源处理工具中启动一个异步线程监测redis
中信息的变化,为了 方便快速地知道是否有变化,我设置一个版本号来判断,版本号不一样再进行进一步同步差异连接信息。其中redis
存储信息和版本的key
做了一个枚举类的约定绑定:
package com.lingdu.common.datasource.enums;
/**
* redis中数据版本枚举类
* 描述:TODO
* @author 99013
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2023-03-22 9:19
*/
public enum RedisDataVersion {
/**
* redis中存的多数据源信息键名
*/
DIMENSION_DOOR_DATA_SOURCE("DIMENSION_DOOR_DATA_SOURCE");
/**
* 数据名称
*/
private final String name;
/**
* 数据版本名称
*/
private final String versionName;
RedisDataVersion(String name) {
this.name = name;
this.versionName = name+"_VERSION";
}
public String getName() {
return name;
}
public String getVersionName() {
return versionName;
}
}
同步数据源的方法中就比较好获取redis
中的版本和数据信息了。完整代码在上面的动态切换数据源中
/**
* 同步数据源
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-08-23 11:01
*/
private void syncDs() {
/*变动标记值*/
String v = redisService.getCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getVersionName());
// redis缓存数据添加了过期时间,如果未获取到版本信息则重新初始化
if (v == null) {
updateDataSourceToRedis();
}
// 版本号不一致
if (!version.equals(v)) {
// 数据源同步信息
List<SysDatabase> sysDatabases = redisService.getCacheObject(RedisDataVersion.DIMENSION_DOOR_DATA_SOURCE.getName());
// 分组记录数据源
Map<String, DataSourceCategory> group = new HashMap<>(sysDatabases == null ? 6 : (int) ((sysDatabases.size() / 0.75) + 1));
// 记录所有数据源
Map<String, SysDatabase> msd = new HashMap<>(sysDatabases == null ? 6 : (int) ((sysDatabases.size() / 0.75) + 1));
/*当前已生效的数据源*/
Map<String, DataSource> dataSources = getDataSources();
// 不能直接拿 引用变量在循环后会被还原所以要new
Set<String> dskey = new HashSet<>(dataSources.keySet());
try {
// 将数据源信息转换成map形式
assert sysDatabases != null;
for (SysDatabase sysDatabase : sysDatabases) {
// 转map存储
msd.put(getDsKey(sysDatabase), sysDatabase);
// 数据源分组
classification(sysDatabase, group);
// 数据源差异比较
contrast(sysDatabase, dataSources, dskey);
}
for (String s : dskey) {
// 删除多余的数据源
removeDataSource(s);
}
} catch (Exception e) {
// 防止出现异常导致线程崩溃
e.printStackTrace();
}
// 更新本地缓存
DATE_SOURCE_GROUP = group;
DATE_SOURCE = msd;
// 更新本地标记
version = v;
long l = System.currentTimeMillis();
long l2;
long timeout = 10000;
// 死循环检测线程完成情况
do {
// 清除已执行完的线程标记
threadList.removeIf(thread -> thread.getState().equals(Thread.State.WAITING));
l2 = System.currentTimeMillis();
// 10秒超时时间
if ((l2 - l) > timeout) {
// 清除所有数据 防止循环中未清除干净导致脏数据
threadList = null;
threadList = new ArrayList<>();
break;
}
// 延迟100毫秒左右执行下一轮循环
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} while (!threadList.isEmpty());
}
}
分布式锁
用例
package com.lingdu.system.service.impl;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import com.alibaba.fastjson2.JSONObject;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.lingdu.common.core.constant.Constants;
import com.lingdu.common.core.lingdu.MyThreadPool;
import com.lingdu.common.core.lingdu.domain.SysDatabase;
import com.lingdu.common.core.lingdu.utils.IpTest;
import com.lingdu.common.core.lingdu.utils.JabotUtil;
import com.lingdu.common.core.utils.DateUtils;
import com.lingdu.common.core.utils.SpringUtils;
import com.lingdu.common.security.utils.SecurityUtils;
import com.lingdu.job.api.RemoteSysJobService;
import com.lingdu.system.api.RemoteDatabaseService;
import com.lingdu.system.api.domain.SysUserKeys;
import org.apache.commons.collections.map.LinkedMap;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import com.lingdu.system.mapper.SysCommunicationMapper;
import com.lingdu.common.core.lingdu.domain.SysCommunication;
import com.lingdu.system.service.ISysCommunicationService;
import org.springframework.transaction.annotation.Transactional;
/**
* 系统内外网信息交互Service业务层处理
* 项目启动后运行此方法:CommandLineRunner实现
*
* @author lingdu
* @date 2022-10-14
*/
@Service
public class SysCommunicationServiceImpl implements ISysCommunicationService, CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(SysCommunicationServiceImpl.class);
private final SysCommunicationMapper sysCommunicationMapper;
public SysCommunicationServiceImpl(SysCommunicationMapper sysCommunicationMapper) {
this.sysCommunicationMapper = sysCommunicationMapper;
}
/**
* 项目启动后立马执行
* 项目启动后运行此方法:CommandLineRunner实现
*/
@Override
public void run(String... var1) {
this.start();
}
/**
* 查询系统内外网信息交互
*
* @param id 系统内外网信息交互主键
* @return 系统内外网信息交互
*/
@Override
public SysCommunication selectSysCommunicationById(String id) {
return sysCommunicationMapper.selectSysCommunicationById(id);
}
/**
* 查询系统内外网信息交互列表
*
* @param sysCommunication 系统内外网信息交互
* @return 系统内外网信息交互
*/
@Override
public List<SysCommunication> selectSysCommunicationList(SysCommunication sysCommunication) {
return sysCommunicationMapper.selectSysCommunicationList(sysCommunication);
}
/**
* 新增系统内外网信息交互
*
* @param sysCommunication 系统内外网信息交互
* @return 结果
*/
@Override
public int insertSysCommunication(SysCommunication sysCommunication) {
sysCommunication.setCreateBy(SecurityUtils.getUsername());
sysCommunication.setCreateTime(DateUtils.getNowDate());
sysCommunication.setUpdateBy(SecurityUtils.getUsername());
sysCommunication.setUpdateTime(DateUtils.getNowDate());
return sysCommunicationMapper.insertSysCommunication(sysCommunication);
}
/**
* 修改系统内外网信息交互
*
* @param sysCommunication 系统内外网信息交互
* @return 结果
*/
@Override
public int updateSysCommunication(SysCommunication sysCommunication) {
sysCommunication.setUpdateBy(SecurityUtils.getUsername());
sysCommunication.setUpdateTime(DateUtils.getNowDate());
return sysCommunicationMapper.updateSysCommunication(sysCommunication);
}
/**
* 批量删除系统内外网信息交互
*
* @param ids 需要删除的系统内外网信息交互主键集合
* @return 结果
*/
@Override
public int deleteSysCommunicationByIds(String[] ids) {
return sysCommunicationMapper.deleteSysCommunicationByIds(ids);
}
/**
* 删除系统内外网信息交互信息
*
* @param id 系统内外网信息交互主键
* @return 结果
*/
@Override
public int deleteSysCommunicationById(String id) {
return sysCommunicationMapper.deleteSysCommunicationById(id);
}
/*👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇内外网协同逻辑👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇*/
// 内外网协同逻辑 //
/*👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇内外网协同逻辑👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇*/
/**
* 方法入口
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-11-09 17:10
*/
public void start() {
// 内网生产环境才启动此监控逻辑
if (Constants.D_E_I.equals(Constants.deployedEnvironment)) {
log.info("正在启动MySQL日志监控。。。");
// 这里调用线程池触发
MyThreadPool.threadPool.execute(this::init);
}
}
public void init() {
// 列信息
List<LinkedMap> columns = sysCommunicationMapper.sysCommunicationInfo();
// 主数据源配置信息
List<SysDatabase> master = sysCommunicationMapper.selectSysDatabaseListByDsKey("master", Constants.deployedEnvironment);
MonitoringMySql mms = new MonitoringMySql();
mms.columns = columns;
mms.sysDatabase = master.get(0);
mms.init();
}
}
/**
* 内网MySQL日志监听
* 描述: ***********************************************************************************************************
*
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-10-24 15:47
*/
class MonitoringMySql {
private static final Logger log = LoggerFactory.getLogger(MonitoringMySql.class);
/**
* 表名与表id对应关系
*/
public Map<String, Long> tableId = new HashMap<>();
/**
* 字段信息
*/
public List<LinkedMap> columns;
/**
* 数据库连接信息
*/
public SysDatabase sysDatabase;
private final RedissonClient redisson = SpringUtils.getBean(RedissonClient.class);
public void init() {
String url = sysDatabase.getUrl();
List<IpTest> ipTests = IpTest.ipAndPort(url);
BinaryLogClient client = new BinaryLogClient(ipTests.get(0).getIp(), Integer.parseInt(ipTests.get(0).getPort()), sysDatabase.getUsername(), JabotUtil.jiemi(sysDatabase.getPsw()));
// 这里由于内网可能存在集群服务所以设置不同的id(相同id经过测试不能同时存在)
client.setServerId(System.currentTimeMillis());
client.registerEventListener(event -> {
EventData data = event.getData();
if (data instanceof TableMapEventData) {
TableMapEventData tableMapEventData = (TableMapEventData) data;
// 保存对应关系(数据库名.表名,数据表在数据库中的id)
tableId.put(tableMapEventData.getDatabase() + "." + tableMapEventData.getTable(), tableMapEventData.getTableId());
} else {
if (data instanceof UpdateRowsEventData) {
updateRowsEventDataDispose((UpdateRowsEventData) data);
} else if (data instanceof WriteRowsEventData) {
writeRowsEventDataDispose((WriteRowsEventData) data);
} else if (data instanceof DeleteRowsEventData) {
deleteRowsEventDataDispose((DeleteRowsEventData) data);
}
}
});
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 新增相关
*
* @param data 日志信息
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-10-24 14:38
*/
@Transactional(rollbackFor = Exception.class)
public void writeRowsEventDataDispose(WriteRowsEventData data) {
// 获取字段顺序 SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'sys_communication'
long tableId1 = data.getTableId();
Long tid = tableId.get("sjzx_cloud.sys_communication");
if (tid == null) {
return;
}
// 针对某一张表的处理
if (tableId1 == tid) {
// 包含列(从0开始)
BitSet includedColumns = data.getIncludedColumns();
Serializable[] serializables = data.getRows().get(0);
int i = 0;
// 字段名与字段值对应关系
// Map<String, Object> map = new HashMap<>((int) ((columns.size() / 0.75) + 1));
String id = "";
for (LinkedMap column : columns) {
boolean b = includedColumns.get(i);
// 包含列
if (b) {
String columnName = column.get("COLUMN_NAME").toString();
// 字段名,字段值
// map.put(columnName,serializables[i]);
if ("id".equals(columnName)) {
id = serializables[i].toString();
break;
}
}
i++;
}
log.info("****************\n*内外网协同识别标记*\n****************");
// 1.获取分布式锁,只要锁的名字一样,获取到的锁就是同一把锁。
RLock lock = redisson.getLock(id);
if (lock.isLocked()) {
return;
}
// 上锁
lock.lock();
try {
// 获取到id后根据id查询到相关记录再执行下一步操作 map.get("id").toString()
SysCommunication sysCommunication = SpringUtils.getBean(SysCommunicationMapper.class).selectSysCommunicationById(id);
if (sysCommunication == null) {
return;
}
// 如果状态不是0则代表已被其他微服务模块识别到了
if (sysCommunication.getTriggerState() != 0) {
return;
}
// 触发状态(0已触发等待识别,1已识别等待完成,2已完成,3异常,4检测超时)
sysCommunication.setTriggerState(1);
// 更新内外网同步记录
SpringUtils.getBean(ISysCommunicationService.class).updateSysCommunication(sysCommunication);
try {
execute(sysCommunication);
// 触发状态(0已触发等待识别,1已识别等待完成,2已完成,3异常,4检测超时)
sysCommunication.setTriggerState(2);
// 更新内外网同步记录
SpringUtils.getBean(ISysCommunicationService.class).updateSysCommunication(sysCommunication);
} catch (Exception e) {
// 触发状态(0已触发等待识别,1已识别等待完成,2已完成,3异常,4检测超时)
sysCommunication.setTriggerState(3);
sysCommunication.setMessage(e.getMessage());
// 更新内外网同步记录
SpringUtils.getBean(ISysCommunicationService.class).updateSysCommunication(sysCommunication);
e.printStackTrace();
}
} finally {
// 解锁
lock.unlock();
}
}
}
/**
* 更新相关
*
* @param data 日志信息
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-10-24 14:37
*/
public void updateRowsEventDataDispose(UpdateRowsEventData data) {
data.getRows();
}
/**
* 删除相关
*
* @param data 日志信息
* @作者 <b><a class=b href="https://blog.csdn.net/lingdu_dou" color="red">⭕°</a></b>
* @创建时间 2022-10-24 14:39
*/
public void deleteRowsEventDataDispose(DeleteRowsEventData data) {
data.getRows();
}
public void execute(SysCommunication sysCommunication) {
// 触发类型(1更新redis中用户信息权限信息,2更新redis中的虚拟接口信息(删除单个接口缓存),3更新redis中的数据源信息,4同步定时任务信息,5同步单个用户密钥,6系统参数缓存同步
Integer triggerType = sysCommunication.getTriggerType();
String message = sysCommunication.getMessage();
JSONObject jsonObject = JSONObject.parseObject(message);
switch (triggerType) {
// 1更新redis中用户信息权限信息
case 1:
// 执行更新权限逻辑
// 更新redis中的用户权限
SpringUtils.getBean(SysPermissionServiceImpl.class).updatePermission(jsonObject.getLong("roleId"), jsonObject.getObject("userIds", Long[].class));
break;
// 2更新redis中的虚拟接口信息
case 2:
SpringUtils.getBean(SysVirtualInterfaceServiceImpl.class).deleteRedis(jsonObject.getObject("key", String.class));
break;
// 3更新redis中的数据源信息
case 3:
SpringUtils.getBean(RemoteDatabaseService.class).reload();
break;
// 4同步定时任务信息
case 4:
SpringUtils.getBean(RemoteSysJobService.class).analysis(JSONObject.toJSONString(jsonObject));
break;
// 5同步单个用户密钥
case 5:
// 同步单个用户密钥
SpringUtils.getBean(SysUserKeysServiceImpl.class).syncKey(jsonObject.getObject("userKey", SysUserKeys.class), jsonObject.getBoolean("isUpdate"));
break;
// 6系统参数缓存同步
case 6:
// 内网执行同步逻辑
SpringUtils.getBean(SysConfigServiceImpl.class).execute(JSONObject.toJSONString(jsonObject));
break;
default:
log.error("监测到未能识别的同步操作类型[" + triggerType + "]");
break;
}
}
}