前言
针对数据库的查询操作,使用批量方式自然是要快速不少,本文则介绍关于批量的API实现。本共实现两类API实现,一类是串行的分组并发,一类是并行的分组并发。
- 利用Lists.partition分组
- CompletableFuture.supplyAsync多线程并发
串行的分组并发
public List queryInCondition(String key, Object… value) { long current = System.currentTimeMillis(); Objects.requireNonNull(key, “Filter key cant be null.”); if (value == null || value.length == 0) { LOG.warn(“Return empty data, cause query by key:{},but value is empty”, key); return Collections.emptyList(); } List result = Lists.newArrayListWithCapacity(value.length); if (value.length MAX_PARTITION_DATA_COUNT ? MAX_PARTITION_DATA_COUNT : value.length / MIN_PARTITION; List valuePartition = Lists.partition(Lists.newArrayList(value), SPLIT); for (List values : valuePartition) { result.addAll(queryByKeyValues(currentSession(), key, values)); } } final long cost = System.currentTimeMillis() – current; LOG.debug(“QIC with key:{},Data Count:{}, Speed:{}/s, Time Cost:{} ms”, key, result.size(), result.size() * 1000 / cost, cost); return result; }
并行的分组并发
public List queryInConditionHighSpeed(String key, Object… value) { long current = System.currentTimeMillis(); Objects.requireNonNull(key, “Filter key cant be null.”); if (value == null || value.length == 0) { LOG.warn(“Return empty data, cause query by key:{},but value is empty”, key); return Collections.emptyList(); } if (value.length { List resultList = Collections.emptyList(); Session session = null; try { session = sessionFactory.openSession(); resultList = queryByKeyValues(session, key, values); } finally { if (session != null && session.isOpen()) session.close(); } return resultList; }); final long cost = System.currentTimeMillis() – current; LOG.debug(“QICH with key:{},Data Count:{}, Speed:{}, Time Cost:{} ms”, key, result.size(), result.size() * 1000 / cost, cost); return result; }
辅助hibernate过滤查询方法
private List queryByKeyValues(Session session, String key, List values) { CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder(); CriteriaQuery query = criteriaBuilder.createQuery(getEntityClass()); Root root = query.from(getEntityClass()); query.select(root); CriteriaBuilder.In in = criteriaBuilder.in(root.get(key)); values.forEach(in::value); query.where(in); final Query queryExe = session.createQuery(query); LOG.trace(“Hibernate execute SQL:{}”, queryExe.getQueryString()); List resultList = queryExe.getResultList(); return resultList; }
ConcurrencyUtil辅助类
自定义并发辅助类
import com.google.common.collect.Lists;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.List;import java.util.Objects;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.function.Function;import static java.util.stream.Collectors.toList;public class ConcurrencyUtil { private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyUtil.class); private static final int MAX_PARTITION_SIZE = 2000; /** * 提供大数据进行分组并发处理能力 * * @param executorService 并发执行线程池 * @param data 待处理数据 * @param function 针对分组后的每组数据的处理逻辑 * @param 数据类型 * @param 返回数据类型 * @return */ public static List groupInvoke(ExecutorService executorService, List data, Function function) { int threadCount = getPoolSize(executorService); int i = (data.size() + threadCount) / threadCount; List partition = Lists.partition(data, i > MAX_PARTITION_SIZE ? MAX_PARTITION_SIZE : i); final List<CompletableFuture> futures = partition.stream().map(p -> CompletableFuture.supplyAsync(() -> function.apply(p) , executorService)).collect(toList()); List result = futures.stream().map(p -> { try { return p.get(5, TimeUnit.MINUTES); } catch (Exception e) { LOG.error(“Concurrency groupInvoke error.”, e); } return null; }).filter(Objects::nonNull).flatMap(List::stream).filter(Objects::nonNull).collect(toList()); return result; } public static List invoke(ExecutorService executorService, List data, Function function) { List futures = Lists.newArrayListWithCapacity(data.size()); for (T datum : data) { final Future future = executorService.submit(() -> function.apply(datum)); futures.add(future); } List result = Lists.newArrayListWithCapacity(data.size()); for (Future future : futures) { try { result.add(future.get(5, TimeUnit.MINUTES)); } catch (Exception e) { LOG.error(“Concurrency invoke error.”, e); } } return result.stream().filter(Objects::nonNull).collect(toList()); } private static int getPoolSize(ExecutorService executorService) { int threadCount = 10; if (executorService instanceof ThreadPoolExecutor) { threadCount = ((ThreadPoolExecutor) executorService).getCorePoolSize(); } return threadCount <= 0 ? 10 : threadCount; }}