/*
 * Decompiled with CFR 0.152.
 */
package com.easy.query.core.basic.jdbc.executor.internal;

import com.easy.query.core.basic.jdbc.conn.EasyConnection;
import com.easy.query.core.basic.jdbc.executor.internal.common.CommandExecuteUnit;
import com.easy.query.core.basic.jdbc.executor.internal.common.DataSourceSQLExecutorUnit;
import com.easy.query.core.basic.jdbc.executor.internal.common.ExecutionUnit;
import com.easy.query.core.basic.jdbc.executor.internal.common.GroupByValue;
import com.easy.query.core.basic.jdbc.executor.internal.common.SQLExecutorGroup;
import com.easy.query.core.basic.jdbc.executor.internal.unit.Executor;
import com.easy.query.core.basic.thread.FuturesInvoker;
import com.easy.query.core.configuration.EasyQueryOption;
import com.easy.query.core.enums.sharding.ConnectionModeEnum;
import com.easy.query.core.exception.EasyQueryException;
import com.easy.query.core.sharding.context.StreamMergeContext;
import com.easy.query.core.util.EasyCollectionUtil;
import com.easy.query.core.util.EasyShardingUtil;
import com.easy.query.core.util.EasyUtil;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class ShardingExecutor {
    private ShardingExecutor() {
    }

    public static <TResult> TResult execute(StreamMergeContext streamMergeContext, Executor<TResult> executor, List<ExecutionUnit> sqlRouteUnits) throws SQLException {
        List<TResult> results = ShardingExecutor.execute0(streamMergeContext, executor, sqlRouteUnits);
        if (EasyCollectionUtil.isEmpty(results)) {
            throw new EasyQueryException("execute result empty");
        }
        return executor.getShardingMerger().streamMerge(streamMergeContext, results);
    }

    private static <TResult> List<TResult> execute0(StreamMergeContext streamMergeContext, Executor<TResult> executor, List<ExecutionUnit> executionUnits) throws SQLException {
        if (EasyCollectionUtil.isSingle(executionUnits)) {
            DataSourceSQLExecutorUnit dataSourceSQLExecutorUnit = ShardingExecutor.getSingleSQLExecutorGroups(streamMergeContext, EasyCollectionUtil.first(executionUnits));
            return executor.execute(dataSourceSQLExecutorUnit);
        }
        List<DataSourceSQLExecutorUnit> dataSourceSQLExecutorUnits = EasyUtil.groupBy(executionUnits.stream(), ExecutionUnit::getDataSourceName).map(o -> ShardingExecutor.getSQLExecutorGroups(streamMergeContext, o)).collect(Collectors.toList());
        if (dataSourceSQLExecutorUnits.size() == 1) {
            DataSourceSQLExecutorUnit dataSourceSQLExecutorUnit = (DataSourceSQLExecutorUnit)dataSourceSQLExecutorUnits.get(0);
            return executor.execute(dataSourceSQLExecutorUnit);
        }
        List futures = ShardingExecutor.executeFuture0(streamMergeContext, executor, dataSourceSQLExecutorUnits);
        EasyQueryOption easyQueryOption = streamMergeContext.getEasyQueryOption();
        long timeoutMillis = easyQueryOption.getShardingExecuteTimeoutMillis();
        try (FuturesInvoker invoker = new FuturesInvoker(futures);){
            List lists = invoker.get(timeoutMillis);
            List list = lists.stream().flatMap(Collection::stream).collect(Collectors.toList());
            return list;
        }
    }

    private static <TResult> List<Future<List<TResult>>> executeFuture0(StreamMergeContext streamMergeContext, Executor<TResult> executor, List<DataSourceSQLExecutorUnit> dataSourceSQLExecutorUnits) {
        ExecutorService executorService = streamMergeContext.getRuntimeContext().getShardingExecutorService().getExecutorService();
        ArrayList<Future<List<TResult>>> futures = new ArrayList<Future<List<TResult>>>(dataSourceSQLExecutorUnits.size());
        for (DataSourceSQLExecutorUnit dataSourceSQLExecutorUnit : dataSourceSQLExecutorUnits) {
            Future<List> future = executorService.submit(() -> executor.execute(dataSourceSQLExecutorUnit));
            futures.add(future);
        }
        return futures;
    }

    private static DataSourceSQLExecutorUnit getSQLExecutorGroups(StreamMergeContext streamMergeContext, GroupByValue<String, ExecutionUnit> sqlGroups) {
        boolean isSerialExecute = !streamMergeContext.isQuery();
        int maxShardingQueryLimit = streamMergeContext.getMaxShardingQueryLimit();
        String dataSourceName = sqlGroups.key();
        List<ExecutionUnit> sqlGroupExecutionUnits = sqlGroups.values();
        int groupUnitSize = sqlGroupExecutionUnits.size();
        ConnectionModeEnum useConnectionMode = streamMergeContext.getConnectionMode();
        ConnectionModeEnum connectionMode = EasyShardingUtil.getActualConnectionMode(isSerialExecute, maxShardingQueryLimit, groupUnitSize, useConnectionMode);
        int parallelCount = isSerialExecute ? 1 : Math.min(groupUnitSize, maxShardingQueryLimit);
        List<List<ExecutionUnit>> sqlUnitPartitions = EasyCollectionUtil.partition(sqlGroupExecutionUnits, parallelCount);
        int createDbConnectionCount = EasyCollectionUtil.first(sqlUnitPartitions).size();
        List<EasyConnection> easyConnections = streamMergeContext.getEasyConnections(dataSourceName, createDbConnectionCount);
        List<List> sqlExecutorUnitPartitions = EasyCollectionUtil.select(sqlUnitPartitions, (executionUnits, index0) -> EasyCollectionUtil.select(executionUnits, (executionUnit, index1) -> {
            EasyConnection easyConnection = (EasyConnection)easyConnections.get(index1);
            return new CommandExecuteUnit((ExecutionUnit)executionUnit, easyConnection);
        }));
        List<SQLExecutorGroup<CommandExecuteUnit>> sqlExecutorGroups = EasyCollectionUtil.select(sqlExecutorUnitPartitions, (o, i) -> new SQLExecutorGroup(connectionMode, o));
        return new DataSourceSQLExecutorUnit(dataSourceName, connectionMode, sqlExecutorGroups);
    }

    private static DataSourceSQLExecutorUnit getSingleSQLExecutorGroups(StreamMergeContext streamMergeContext, ExecutionUnit executionUnit) {
        ConnectionModeEnum connectionMode = ConnectionModeEnum.MEMORY_STRICTLY;
        String dataSourceName = executionUnit.getDataSourceName();
        List<EasyConnection> easyConnections = streamMergeContext.getEasyConnections(dataSourceName, 1);
        EasyConnection easyConnection = EasyCollectionUtil.first(easyConnections);
        CommandExecuteUnit commandExecuteUnit = new CommandExecuteUnit(executionUnit, easyConnection);
        SQLExecutorGroup<CommandExecuteUnit> sqlExecutorGroup = new SQLExecutorGroup<CommandExecuteUnit>(connectionMode, Collections.singletonList(commandExecuteUnit));
        return new DataSourceSQLExecutorUnit(dataSourceName, connectionMode, Collections.singletonList(sqlExecutorGroup));
    }
}

