package org.apache.sling.distribution.journal.queue.impl;

import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.queue.CacheCallback;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.queue.QueueState;
import org.apache.sling.distribution.journal.shared.AgentId;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
/* loaded from: input_file:org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.class */
public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
    private static final int CLEANUP_THRESHOLD = 10000;
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
    private final PackageQueuedNotifier queuedNotifier;
    private final CacheCallback callback;
    private final QueueErrors queueErrors;
    private ServiceRegistration<?> reg;
    private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap();
    private volatile PubQueueCache cache = newCache();

    public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors queueErrors, CacheCallback cacheCallback, BundleContext bundleContext) {
        this.queuedNotifier = new PackageQueuedNotifier(eventAdmin);
        this.queueErrors = queueErrors;
        this.callback = cacheCallback;
        startCleanupTask(bundleContext);
        LOG.info("Started Publisher queue provider service");
    }

    private void startCleanupTask(BundleContext bundleContext) {
        Hashtable hashtable = new Hashtable();
        hashtable.put("scheduler.concurrent", false);
        hashtable.put("scheduler.period", 43200L);
        this.reg = bundleContext.registerService(Runnable.class.getName(), this, hashtable);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        PubQueueCache pubQueueCache = this.cache;
        if (pubQueueCache != null) {
            pubQueueCache.close();
        }
        if (this.reg != null) {
            try {
                this.reg.unregister();
                this.reg = null;
            } catch (Exception e) {
                LOG.info(e.getMessage(), e);
            }
        }
        IOUtils.closeQuietly(this.queuedNotifier);
        LOG.info("Stopped Publisher queue provider service");
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting package cache cleanup task");
        PubQueueCache pubQueueCache = this.cache;
        if (pubQueueCache != null) {
            int size = pubQueueCache.size();
            if (size > CLEANUP_THRESHOLD) {
                LOG.info("Cleanup package cache (size={}/{})", Integer.valueOf(size), Integer.valueOf(CLEANUP_THRESHOLD));
                pubQueueCache.close();
                this.cache = newCache();
            } else {
                LOG.info("No cleanup required for package cache (size={}/{})", Integer.valueOf(size), Integer.valueOf(CLEANUP_THRESHOLD));
            }
        }
        LOG.info("Stopping package cache cleanup task");
    }

    @Override // org.apache.sling.distribution.journal.queue.PubQueueProvider
    @Nonnull
    public Set<String> getQueueNames(String str) {
        HashSet hashSet = new HashSet();
        for (String str2 : this.callback.getSubscribedAgentIds(str)) {
            hashSet.add(str2);
            QueueState queueState = this.callback.getQueueState(str, str2);
            if (queueState != null) {
                if (queueState.getMaxRetries() >= 0) {
                    hashSet.add(String.format("%s-error", str2));
                }
            }
        }
        return hashSet;
    }

    @Override // org.apache.sling.distribution.journal.queue.PubQueueProvider
    @Nonnull
    public PackageQueuedNotifier getQueuedNotifier() {
        return this.queuedNotifier;
    }

    @Override // org.apache.sling.distribution.journal.queue.PubQueueProvider
    @Nullable
    public DistributionQueue getQueue(String str, String str2) {
        if (str2.endsWith("-error")) {
            return getErrorQueue(str, str2);
        }
        QueueState queueState = this.callback.getQueueState(str, str2);
        if (queueState == null) {
            return null;
        }
        long lastProcessedOffset = queueState.getLastProcessedOffset() + 1;
        OffsetQueue<DistributionQueueItem> offsetQueue = getOffsetQueue(str, lastProcessedOffset);
        return new PubQueue(str2, offsetQueue.getMinOffsetQueue(lastProcessedOffset), queueState.getHeadRetries(), this.queueErrors.getError(str, str2), queueState.getClearCallback());
    }

    @Nonnull
    private DistributionQueue getErrorQueue(String str, String str2) {
        AgentId agentId = new AgentId(StringUtils.substringBeforeLast(str2, "-error"));
        OffsetQueue<Long> orDefault = this.errorQueues.getOrDefault(getErrorQueueKey(str, agentId.getSlingId(), agentId.getAgentName()), new OffsetQueueImpl());
        Long headItem = orDefault.getHeadItem();
        return new PubErrQueue(str2, headItem == null ? new OffsetQueueImpl() : getOffsetQueue(str, headItem.longValue()), orDefault);
    }

    @Override // org.apache.sling.distribution.journal.queue.PubQueueProvider
    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String str, long j) {
        try {
            return this.cache.getOffsetQueue(str, j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.sling.distribution.journal.queue.PubQueueProvider
    public int getMaxQueueSize(String str) {
        Optional<Long> minOffset = getMinOffset(str);
        if (minOffset.isPresent()) {
            return getOffsetQueue(str, minOffset.get().longValue()).getMinOffsetQueue(minOffset.get().longValue()).getSize();
        }
        return 0;
    }

    private Optional<Long> getMinOffset(String str) {
        return this.callback.getSubscribedAgentIds(str).stream().map(str2 -> {
            return Long.valueOf(lastProcessedOffset(str, str2));
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        });
    }

    private long lastProcessedOffset(String str, String str2) {
        return this.callback.getQueueState(str, str2).getLastProcessedOffset();
    }

    @Override // org.apache.sling.distribution.journal.queue.PubQueueProvider
    public void handleStatus(MessageInfo messageInfo, PackageStatusMessage packageStatusMessage) {
        if (packageStatusMessage.getStatus() == PackageStatusMessage.Status.REMOVED_FAILED) {
            this.errorQueues.computeIfAbsent(getErrorQueueKey(packageStatusMessage.getPubAgentName(), packageStatusMessage.getSubSlingId(), packageStatusMessage.getSubAgentName()), str -> {
                return new OffsetQueueImpl();
            }).putItem(messageInfo.getOffset(), Long.valueOf(packageStatusMessage.getOffset()));
        }
    }

    private String getErrorQueueKey(String str, String str2, String str3) {
        return String.format("%s#%s#%s", str, str2, str3);
    }

    private PubQueueCache newCache() {
        return new PubQueueCache(this.queuedNotifier, this.callback);
    }
}
