/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.starter.dispatch;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.retry.task.support.dispatch.ScanRetryActor;
import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
import com.aizuda.snailjob.server.starter.dispatch.ConsumerBucket;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import lombok.Generated;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component(value="ScanBucketActor")
@Scope(value="prototype")
public class ConsumerBucketActor
extends AbstractActor {
    private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
    private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
    private final SystemProperties systemProperties;
    private final RateLimiterHandler rateLimiterHandler;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ConsumerBucket.class, consumerBucket -> {
            try {
                this.doDispatch((ConsumerBucket)consumerBucket);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Data dispatcher processing exception. [{}]", new Object[]{consumerBucket, e});
            }
        }).build();
    }

    private void doDispatch(ConsumerBucket consumerBucket) {
        if (CollUtil.isEmpty(consumerBucket.getBuckets())) {
            return;
        }
        this.doScanJobAndWorkflow(consumerBucket);
        this.doScanRetry(consumerBucket);
    }

    private void doScanRetry(ConsumerBucket consumerBucket) {
        this.rateLimiterHandler.refreshRate();
        Set<Integer> totalBuckets = consumerBucket.getBuckets();
        int retryMaxPullParallel = this.systemProperties.getRetryMaxPullParallel();
        List partitions = Lists.partition(new ArrayList<Integer>(totalBuckets), (int)((totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel));
        for (List buckets : partitions) {
            String key = StrUtil.join((CharSequence)",", new TreeSet(buckets));
            if (ScanRetryActor.REPEATED_PULL.contains(key)) {
                SnailJobLog.LOCAL.warn("Discard the current scanning task because there are ongoing tasks in the current batch.[{}]", new Object[]{key});
                continue;
            }
            ScanTask scanTask = new ScanTask();
            scanTask.setBucketStr(key);
            scanTask.setBuckets(new HashSet(buckets));
            ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor();
            ScanRetryActor.REPEATED_PULL.add(key);
            scanRetryActorRef.tell((Object)scanTask, scanRetryActorRef);
        }
    }

    private void doScanJobAndWorkflow(ConsumerBucket consumerBucket) {
        ScanTask scanTask = new ScanTask();
        scanTask.setBuckets(consumerBucket.getBuckets());
        ActorRef scanJobActorRef = this.cacheActorRef("DEFAULT_JOB_KEY", SyetemTaskTypeEnum.JOB);
        scanJobActorRef.tell((Object)scanTask, scanJobActorRef);
        ActorRef scanWorkflowActorRef = this.cacheActorRef("DEFAULT_JOB_KEY", SyetemTaskTypeEnum.WORKFLOW);
        scanWorkflowActorRef.tell((Object)scanTask, scanWorkflowActorRef);
    }

    private ActorRef cacheActorRef(String groupName, SyetemTaskTypeEnum typeEnum) {
        ActorRef scanActorRef = CacheGroupScanActor.get((String)groupName, (SyetemTaskTypeEnum)typeEnum);
        if (Objects.isNull(scanActorRef)) {
            scanActorRef = (ActorRef)typeEnum.getActorRef().get();
            CacheGroupScanActor.put((String)groupName, (SyetemTaskTypeEnum)typeEnum, (ActorRef)scanActorRef);
        }
        return scanActorRef;
    }

    @Generated
    public ConsumerBucketActor(SystemProperties systemProperties, RateLimiterHandler rateLimiterHandler) {
        this.systemProperties = systemProperties;
        this.rateLimiterHandler = rateLimiterHandler;
    }
}

