一、时序库的基本操作
CREATE DATABASE mydb
SHOW databases show MEASUREMENTS
INSERT cpu,host=serverA,region=us_west value=0.64
SELECT host, region, value FROM cpu where host=‘serverA’
curl -i -XPOST http://localhost:8086/query –data-urlencode “q=CREATE DATABASE mydb”
CREATE RETENTION POLICY “a_year” ON “food_data” DURATION 52w REPLICATION 1
/**
*name--名称,此示例名称为 default
*duration--持续时间,0代表无限制
*shardGroupDuration--shardGroup的存储时间,shardGroup是InfluxDB的一个基本储存结构,应该大于这个时间的数据在查询效率上应该有所降低。
*replicaN--全称是REPLICATION,副本个数
*default--是否是默认策略
*/
InfluxDbConfig influxDbConfig=new InfluxDbConfig();BatchPoints batchPoints=BatchPoints .database("mydb") .consistency(InfluxDB.ConsistencyLevel.ALL) .retentionPolicy("30d")//数据保存30条进行删除 .build();
二、引入时序库依赖包
org.influxdb influxdb-java 2.15
三、配置时序库的地址,数据库名字,保存时间,用户名,密码
spring.influx.url=http://xxx.xxx.xxx.xxx:8086spring.influx.database= mydbspring.influx.retentionPolicy=90d
四、实现配置文件,构建连接时序库
@Slf4j@Configurationpublic class InfluxDbConfig { @Value("${spring.influx.url}") private String influxDBUrl; @Value("${spring.influx.database}") private String database; @Value("${spring.influx.retentionPolicy}") private String policy; @Value("${thread.maxConnection}") private int maxConnection; @Bean public InfluxDbUtils influxDbUtils() { //自行配置连接时序库的用户名,密码 return new InfluxDbUtils("用户名","密码",influxDBUrl,database,policy,maxConnection); }}
五、对时序库的断开重连
/** * 关于时序库连接的重试拦截器 */@Slf4jpublic class RetryIntercepter implements Interceptor { public int maxRetry;//最大重试次数 private int retryNum = 0;//假如设置为3次重试的话,则最大可能请求4次(默认1次+3次重试) public RetryIntercepter(int maxRetry) { this.maxRetry = maxRetry; } @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request().newBuilder() .addHeader("Connection", "close") .build(); Response response = getProceed(chain, request); while (!response.isSuccessful() && retryNum < maxRetry) { retryNum++; log.info("retryNum=" + retryNum); response = getProceed(chain, request); } return response; } private Response getProceed(Chain chain, Request request) throws IOException { Response response=null; try{ response=chain.proceed(request); }catch (Exception e){ log.error("异常输出:{}",e); } return response; }}
六、时序库工具类
@Data@Slf4jpublic class InfluxDbUtils { private String userName; private String password; private String url; public String database; private String retentionPolicy; // InfluxDB实例 private InfluxDB influxDB; // 数据保存策略 public static String policyNamePix = "logmonitor"; //连接池数 //private int maxConnection; public InfluxDbUtils(String userName, String password, String url, String database, String retentionPolicy,int maxConnection) { this.userName = userName; this.password = password; this.url = url; this.database = database; this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy; //this.maxConnection=maxConnection; this.influxDB = influxDbBuild(); } /** * 连接数据库 ,若不存在则创建 * * @return influxDb实例 * name--名称,此示例名称为 default * duration--持续时间,0代表无限制 * shardGroupDuration--shardGroup的存储时间,shardGroup是InfluxDB的一个基本储存结构,应该大于这个时间的数据在查询效率上应该有所降低。 * replicaN--全称是REPLICATION,副本个数 * default--是否是默认策略 */ static OkHttpClient.Builder client = new OkHttpClient.Builder() .connectionPool(new ConnectionPool(4,10,TimeUnit.SECONDS)) .connectTimeout(800,TimeUnit.SECONDS) .readTimeout(800,TimeUnit.SECONDS).writeTimeout(800,TimeUnit.SECONDS); private InfluxDB influxDbBuild() { RetryIntercepter retryIntercepter; if (influxDB == null) { //重试重连的允许次数为3 retryIntercepter =new RetryIntercepter(3); client.addInterceptor(retryIntercepter); influxDB = InfluxDBFactory.connect(url, userName, password,client); } try { createDB(database); influxDB.setDatabase(database); influxDB.createRetentionPolicy(policyNamePix, database, retentionPolicy, "2d", 1, true); //rpName:保留策略,dbName:数据库 ,30d:数据保留30天, 1:副本个数, true:设为当前数据库默认的保留策略 //控制不进行打印日志 influxDB.setLogLevel(InfluxDB.LogLevel.NONE); //如果满5000条或者10000毫秒,满足任何一个条件就会发送一次写的请求 influxDB.enableBatch(5000, 10000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("create influx db failed, error: {}", e); } finally { //将创建的保留策略设置到当前数据库 influxDB.setRetentionPolicy(policyNamePix); } return influxDB; } /**** * 创建数据库 * @param database */ private void createDB(String database) { influxDB.query(new Query("CREATE DATABASE " + database)); } /** * 删除时序库的表 * @param measurement */ public void deleteMeasurement(String measurement){ influxDB.query(new Query("drop measurement "+measurement,"mydb")); } /**获取时序库的查询的集合*/ public List> getObjList(String command) { QueryResult queryResult=influxDB.query(new Query(command,"mydb")); if(queryResult.getResults().size()>0){ if(queryResult.getResults().get(0).getSeries()!=null&&queryResult.getResults().get(0).getSeries().size()>0){ List> objectList=queryResult.getResults().get(0).getSeries().get(0).getValues(); return objectList; } } influxDB.close(); return new ArrayList<>(); }}
留言与评论(共有 0 条评论) “” |