针对数据库的查询操作,使用批量方式自然是要快速不少,本文则介绍关于批量的API实现。本共实现两类API实现,一类是串行的分组并发,一类是并行的分组并发。
public List queryInCondition(String key, Object... value) {
long current = System.currentTimeMillis();
Objects.requireNonNull(key, "Filter key cant be null.");
if (value == null || value.length == 0) {
LOG.warn("Return empty data, cause query by key:{},but value is empty", key);
return Collections.emptyList();
}
List result = Lists.newArrayListWithCapacity(value.length);
if (value.length < MAX_PARTITION_DATA_COUNT) {
result.addAll(queryByKeyValues(currentSession(), key, Arrays.asList(value)));
} else {
//分组,防止SQL拼接过大
int SPLIT = value.length / MIN_PARTITION > MAX_PARTITION_DATA_COUNT ? MAX_PARTITION_DATA_COUNT : value.length / MIN_PARTITION;
List> valuePartition = Lists.partition(Lists.newArrayList(value), SPLIT);
for (List
public List queryInConditionHighSpeed(String key, Object... value) {
long current = System.currentTimeMillis();
Objects.requireNonNull(key, "Filter key cant be null.");
if (value == null || value.length == 0) {
LOG.warn("Return empty data, cause query by key:{},but value is empty", key);
return Collections.emptyList();
}
if (value.length < MAX_PARTITION_DATA_COUNT) {
return queryInCondition(key, value);
}
final SessionFactory sessionFactory = currentSession().getSessionFactory();
List result = groupInvoke(XP_DAO_EXE, Arrays.asList(value), (values) -> {
List resultList = Collections.emptyList();
Session session = null;
try {
session = sessionFactory.openSession();
resultList = queryByKeyValues(session, key, values);
} finally {
if (session != null && session.isOpen())
session.close();
}
return resultList;
});
final long cost = System.currentTimeMillis() - current;
LOG.debug("QICH with key:{},Data Count:{}, Speed:{}, Time Cost:{} ms", key, result.size(),
result.size() * 1000 / cost, cost);
return result;
} private List queryByKeyValues(Session session, String key, List 自定义并发辅助类
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static java.util.stream.Collectors.toList;
public class ConcurrencyUtil {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyUtil.class);
private static final int MAX_PARTITION_SIZE = 2000;
/**
* 提供大数据进行分组并发处理能力
*
* @param executorService 并发执行线程池
* @param data 待处理数据
* @param function 针对分组后的每组数据的处理逻辑
* @param 数据类型
* @param 返回数据类型
* @return
*/
public static List groupInvoke(ExecutorService executorService, List data, Function, List> function) {
int threadCount = getPoolSize(executorService);
int i = (data.size() + threadCount) / threadCount;
List> partition = Lists.partition(data, i > MAX_PARTITION_SIZE ? MAX_PARTITION_SIZE : i);
final List>> futures = partition.stream().map(p -> CompletableFuture.supplyAsync(() -> function.apply(p)
, executorService)).collect(toList());
List result = futures.stream().map(p -> {
try {
return p.get(5, TimeUnit.MINUTES);
} catch (Exception e) {
LOG.error("Concurrency groupInvoke error.", e);
}
return null;
}).filter(Objects::nonNull).flatMap(List::stream).filter(Objects::nonNull).collect(toList());
return result;
}
public static List invoke(ExecutorService executorService, List data, Function function) {
List> futures = Lists.newArrayListWithCapacity(data.size());
for (T datum : data) {
final Future future = executorService.submit(() -> function.apply(datum));
futures.add(future);
}
List result = Lists.newArrayListWithCapacity(data.size());
for (Future future : futures) {
try {
result.add(future.get(5, TimeUnit.MINUTES));
} catch (Exception e) {
LOG.error("Concurrency invoke error.", e);
}
}
return result.stream().filter(Objects::nonNull).collect(toList());
}
private static int getPoolSize(ExecutorService executorService) {
int threadCount = 10;
if (executorService instanceof ThreadPoolExecutor) {
threadCount = ((ThreadPoolExecutor) executorService).getCorePoolSize();
}
return threadCount <= 0 ? 10 : threadCount;
}
}
| 留言与评论(共有 0 条评论) “” |