MySQL支持, 其他优化

This commit is contained in:
QAIU
2025-02-06 16:52:06 +08:00
parent e85215fca1
commit 5052fea9ef
14 changed files with 465 additions and 62 deletions

View File

@@ -1,6 +1,7 @@
package cn.qaiu.lz.common.cache;
import cn.qaiu.db.pool.JDBCPoolInit;
import cn.qaiu.db.pool.JDBCType;
import cn.qaiu.lz.web.model.CacheLinkInfo;
import io.vertx.core.Future;
import io.vertx.core.Promise;
@@ -8,13 +9,17 @@ import io.vertx.core.json.JsonObject;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.templates.SqlTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class CacheManager {
private final JDBCPool jdbcPool = JDBCPoolInit.instance().getPool();
private final JDBCType jdbcType = JDBCPoolInit.instance().getType();
private static final Logger LOGGER = LoggerFactory.getLogger(CacheManager.class);
public Future<CacheLinkInfo> get(String cacheKey) {
String sql = "SELECT share_key as shareKey, direct_link as directLink, expiration FROM cache_link_info WHERE share_key = #{share_key}";
@@ -33,16 +38,31 @@ public class CacheManager {
cacheHit = new CacheLinkInfo(JsonObject.of("cacheHit", false, "shareKey", cacheKey));
}
promise.complete(cacheHit);
}).onFailure(Throwable::printStackTrace);
}).onFailure(e->{
promise.fail(e);
LOGGER.error("cache get:", e);
});
return promise.future();
}
// 插入或更新缓存数据
public Future<Void> cacheShareLink(CacheLinkInfo cacheLinkInfo) {
String sql = "MERGE INTO cache_link_info (share_key, direct_link, expiration) " +
"KEY (share_key) " +
"VALUES (#{shareKey}, #{directLink}, #{expiration})";
String sql;
if (jdbcType == JDBCType.MySQL) {
sql = """
INSERT INTO cache_link_info (share_key, direct_link, expiration)
VALUES (#{shareKey}, #{directLink}, #{expiration})
ON DUPLICATE KEY UPDATE
direct_link = VALUES(direct_link),
expiration = VALUES(expiration);
""";
} else { // 运行H2
sql = "MERGE INTO cache_link_info (share_key, direct_link, expiration) " +
"KEY (share_key) " +
"VALUES (#{shareKey}, #{directLink}, #{expiration})";
}
// 直接传递 CacheLinkInfo 实体类
return SqlTemplate.forUpdate(jdbcPool, sql)
@@ -55,11 +75,23 @@ public class CacheManager {
public Future<Integer> updateTotalByField(String shareKey, CacheTotalField field) {
Promise<Integer> promise = Promise.promise();
String fieldLower = field.name().toLowerCase();
String sql = """
String sql;
if (jdbcType == JDBCType.MySQL) { // 假设你有一个标识当前数据库类型的布尔变量
sql = """
INSERT INTO `api_statistics_info` (`pan_type`, `share_key`, `{field}`, `update_ts`)
VALUES (#{panType}, #{shareKey}, #{total}, #{ts})
ON DUPLICATE KEY UPDATE
`pan_type` = VALUES(`pan_type`),
`{field}` = VALUES(`{field}`),
`update_ts` = VALUES(`update_ts`);
""".replace("{field}", fieldLower);
} else { // 运行H2
sql = """
MERGE INTO `api_statistics_info` (`pan_type`, `share_key`, `{field}`, `update_ts`)
KEY (`share_key`)
VALUES (#{panType}, #{shareKey}, #{total}, #{ts})
""".replace("{field}", fieldLower);
}
getShareKeyTotal(shareKey, fieldLower).onSuccess(total -> {
Integer newTotal = (total == null ? 0 : total) + 1;
@@ -71,7 +103,10 @@ public class CacheManager {
put("ts", System.currentTimeMillis());
}})
.onSuccess(res -> promise.complete(res.rowCount()))
.onFailure(Throwable::printStackTrace);
.onFailure(e->{
promise.fail(e);
LOGGER.error("updateTotalByField: ", e);
});
});
return promise.future();
}
@@ -84,10 +119,12 @@ public class CacheManager {
public Future<Integer> getShareKeyTotal(String shareKey, String name) {
String sql = """
select `share_key`, sum({total_name}) sum_num
from `api_statistics_info`
group by `share_key` having `share_key` = #{shareKey};
SELECT `share_key`, SUM({total_name}) AS sum_num
FROM `api_statistics_info`
WHERE `share_key` = #{shareKey}
GROUP BY `share_key`;
""".replace("{total_name}", name);
Promise<Integer> promise = Promise.promise();
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("shareKey", shareKey);
@@ -98,16 +135,21 @@ public class CacheManager {
Integer total = res.iterator().hasNext() ?
res.iterator().next().getInteger("sum_num") : null;
promise.complete(total);
}).onFailure(e->{
promise.fail(e);
LOGGER.error("getShareKeyTotal: ", e);
});
return promise.future();
}
public Future<Map<String, Integer>> getShareKeyTotal(String shareKey) {
String sql = """
select `share_key`, sum(cache_hit_total) hit_total, sum(api_parser_total) parser_total,
from `api_statistics_info`
group by `share_key` having `share_key` = #{shareKey}
SELECT `share_key`, SUM(cache_hit_total) AS hit_total, SUM(api_parser_total) AS parser_total
FROM `api_statistics_info`
WHERE `share_key` = #{shareKey}
GROUP BY `share_key`;
""";
Promise<Map<String, Integer>> promise = Promise.promise();
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("shareKey", shareKey);
@@ -115,16 +157,19 @@ public class CacheManager {
.mapTo(Row::toJson)
.execute(paramMap)
.onSuccess(res -> {
if(res.iterator().hasNext()) {
JsonObject next = res.iterator().next();
Map<String, Integer> resp = new HashMap<>(){{
put("hit_total" ,next.getInteger("hit_total"));
put("parser_total" ,next.getInteger("parser_total"));
}};
promise.complete(resp);
if(res.iterator().hasNext()) {
JsonObject next = res.iterator().next();
Map<String, Integer> resp = new HashMap<>(){{
put("hit_total" ,next.getInteger("hit_total"));
put("parser_total" ,next.getInteger("parser_total"));
}};
promise.complete(resp);
} else {
promise.complete();
}
promise.complete();
}
}).onFailure(e->{
promise.fail(e);
LOGGER.error("getShareKeyTotal0: ", e);
});
return promise.future();
}

View File

@@ -21,13 +21,13 @@ public class ApiStatisticsInfo implements ToJson {
/**
* pan type 单独拿出来便于统计.
*/
@Length(varcharSize = 4)
@Length(varcharSize = 16)
private String panType;
/**
* 分享key type:key
*/
@Length(varcharSize = 4096)
@Length(varcharSize = 1024)
private String shareKey;
/**

View File

@@ -23,7 +23,7 @@ public class CacheLinkInfo implements ToJson {
/**
* 缓存key: type:ShareKey; e.g. lz:xxxx
*/
@Length(varcharSize = 4096)
@Length(varcharSize = 1024)
private String shareKey;
/**

View File

@@ -48,10 +48,11 @@ public class DbServiceImpl implements DbService {
JDBCPool client = JDBCPoolInit.instance().getPool();
Promise<StatisticsInfo> promise = Promise.promise();
String sql = """
select sum(api_parser_total) parserTotal,sum("cache_hit_total") cacheTotal,
sum(api_parser_total) + sum("cache_hit_total") total
from "api_statistics_info";
select sum(api_parser_total) as parserTotal, sum(cache_hit_total) as cacheTotal,
sum(api_parser_total) + sum(cache_hit_total) as total
from api_statistics_info;
""";
SqlTemplate.forQuery(client, sql).mapTo(StatisticsInfo.class).execute(new HashMap<>()).onSuccess(row -> {
StatisticsInfo info;
if ((info = row.iterator().next()) != null) {
@@ -59,7 +60,10 @@ public class DbServiceImpl implements DbService {
} else {
promise.fail("t_parser_log_info查询为空");
}
}).onFailure(promise::fail);
}).onFailure(e->{
log.error("getStatisticsInfo: ", e);
promise.fail(e);
});
return promise.future();
}
}