分组并发批量搞定查询高性能

前言

针对数据库的查询操作,使用批量方式自然是要快速不少,本文则介绍关于批量的API实现。本共实现两类API实现,一类是串行的分组并发,一类是并行的分组并发。

  • 利用Lists.partition分组
  • CompletableFuture.supplyAsync多线程并发

串行的分组并发

    public List queryInCondition(String key, Object... value) {
        long current = System.currentTimeMillis();
        Objects.requireNonNull(key, "Filter key cant be null.");
        if (value == null || value.length == 0) {
            LOG.warn("Return empty data, cause query by key:{},but value is empty", key);
            return Collections.emptyList();
        }
        List result = Lists.newArrayListWithCapacity(value.length);
        if (value.length < MAX_PARTITION_DATA_COUNT) {
            result.addAll(queryByKeyValues(currentSession(), key, Arrays.asList(value)));
        } else {
            //分组,防止SQL拼接过大
            int SPLIT = value.length / MIN_PARTITION > MAX_PARTITION_DATA_COUNT ? MAX_PARTITION_DATA_COUNT : value.length / MIN_PARTITION;
            List> valuePartition = Lists.partition(Lists.newArrayList(value), SPLIT);
            for (List values : valuePartition) {
                result.addAll(queryByKeyValues(currentSession(), key, values));
            }
        }
        final long cost = System.currentTimeMillis() - current;
        LOG.debug("QIC with key:{},Data Count:{}, Speed:{}/s, Time Cost:{} ms", key, result.size(),
                result.size() * 1000 / cost, cost);
        return result;
    }

并行的分组并发

public List queryInConditionHighSpeed(String key, Object... value) {
        long current = System.currentTimeMillis();
        Objects.requireNonNull(key, "Filter key cant be null.");
        if (value == null || value.length == 0) {
            LOG.warn("Return empty data, cause query by key:{},but value is empty", key);
            return Collections.emptyList();
        }
        if (value.length < MAX_PARTITION_DATA_COUNT) {
            return queryInCondition(key, value);
        }
        final SessionFactory sessionFactory = currentSession().getSessionFactory();
        List result = groupInvoke(XP_DAO_EXE, Arrays.asList(value), (values) -> {
            List resultList = Collections.emptyList();
            Session session = null;
            try {
                session = sessionFactory.openSession();
                resultList = queryByKeyValues(session, key, values);
            } finally {
                if (session != null && session.isOpen())
                    session.close();
            }
            return resultList;
        });
        final long cost = System.currentTimeMillis() - current;
        LOG.debug("QICH with key:{},Data Count:{}, Speed:{}, Time Cost:{} ms", key, result.size(),
                result.size() * 1000 / cost, cost);
        return result;
    }

辅助hibernate过滤查询方法

    private List queryByKeyValues(Session session, String key, List values) {
        CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder();
        CriteriaQuery query = criteriaBuilder.createQuery(getEntityClass());
        Root root = query.from(getEntityClass());
        query.select(root);
        CriteriaBuilder.In in = criteriaBuilder.in(root.get(key));
        values.forEach(in::value);
        query.where(in);
        final Query queryExe = session.createQuery(query);
        LOG.trace("Hibernate execute SQL:{}", queryExe.getQueryString());
        List resultList = queryExe.getResultList();
        return resultList;
    }

ConcurrencyUtil辅助类

自定义并发辅助类

import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;

public class ConcurrencyUtil {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyUtil.class);
    private static final int MAX_PARTITION_SIZE = 2000;
    /**
     * 提供大数据进行分组并发处理能力
     *
     * @param executorService 并发执行线程池
     * @param data            待处理数据
     * @param function        针对分组后的每组数据的处理逻辑
     * @param              数据类型
     * @param              返回数据类型
     * @return
     */
    public static  List groupInvoke(ExecutorService executorService, List data, Function, List> function) {
        int threadCount = getPoolSize(executorService);
        int i = (data.size() + threadCount) / threadCount;
        List> partition = Lists.partition(data, i > MAX_PARTITION_SIZE ? MAX_PARTITION_SIZE : i);

        final List>> futures = partition.stream().map(p -> CompletableFuture.supplyAsync(() -> function.apply(p)
                , executorService)).collect(toList());

        List result = futures.stream().map(p -> {
            try {
                return p.get(5, TimeUnit.MINUTES);
            } catch (Exception e) {
                LOG.error("Concurrency groupInvoke error.", e);
            }
            return null;
        }).filter(Objects::nonNull).flatMap(List::stream).filter(Objects::nonNull).collect(toList());
        return result;
    }

    public static  List invoke(ExecutorService executorService, List data, Function function) {
        List> futures = Lists.newArrayListWithCapacity(data.size());
        for (T datum : data) {
            final Future future = executorService.submit(() -> function.apply(datum));
            futures.add(future);
        }
        List result = Lists.newArrayListWithCapacity(data.size());
        for (Future future : futures) {
            try {
                result.add(future.get(5, TimeUnit.MINUTES));
            } catch (Exception e) {
                LOG.error("Concurrency invoke error.", e);
            }
        }
        return result.stream().filter(Objects::nonNull).collect(toList());
    }


    private static int getPoolSize(ExecutorService executorService) {
        int threadCount = 10;
        if (executorService instanceof ThreadPoolExecutor) {
            threadCount = ((ThreadPoolExecutor) executorService).getCorePoolSize();
        }
        return threadCount <= 0 ? 10 : threadCount;
    }
}

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章