package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.impl.discovery.State;
import org.apache.sling.distribution.journal.messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.queue.CacheCallback;
import org.apache.sling.distribution.journal.queue.QueueState;
import org.apache.sling.distribution.journal.shared.AgentId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.class */
public class MessagingCacheCallback implements CacheCallback {
    private Logger log = LoggerFactory.getLogger(getClass());
    private final MessagingProvider messagingProvider;
    private final String packageTopic;
    private final PublishMetrics publishMetrics;
    private final DiscoveryService discoveryService;
    private final Consumer<ClearCommand> commandSender;

    public MessagingCacheCallback(MessagingProvider messagingProvider, String str, PublishMetrics publishMetrics, DiscoveryService discoveryService, Consumer<ClearCommand> consumer) {
        this.messagingProvider = messagingProvider;
        this.packageTopic = str;
        this.publishMetrics = publishMetrics;
        this.discoveryService = discoveryService;
        this.commandSender = consumer;
    }

    @Override // org.apache.sling.distribution.journal.queue.CacheCallback
    public Closeable createConsumer(MessageHandler<PackageMessage> messageHandler) {
        this.log.info("Starting consumer");
        QueueCacheSeeder queueCacheSeeder = new QueueCacheSeeder(this.messagingProvider.createSender(this.packageTopic));
        Closeable createPoller = this.messagingProvider.createPoller(this.packageTopic, Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(PackageMessage.class, (messageInfo, packageMessage) -> {
            queueCacheSeeder.close();
            messageHandler.handle(messageInfo, packageMessage);
        })});
        queueCacheSeeder.startSeeding();
        return () -> {
            IOUtils.closeQuietly(new Closeable[]{queueCacheSeeder, createPoller});
        };
    }

    @Override // org.apache.sling.distribution.journal.queue.CacheCallback
    public List<FullMessage<PackageMessage>> fetchRange(long j, long j2) throws InterruptedException {
        this.publishMetrics.getQueueCacheFetchCount().increment();
        return new RangePoller(this.messagingProvider, this.packageTopic, j, j2, 30).fetchRange();
    }

    @Override // org.apache.sling.distribution.journal.queue.CacheCallback
    public QueueState getQueueState(String str, String str2) {
        State state = this.discoveryService.getTopologyView().getState(str2, str);
        if (state == null) {
            return null;
        }
        return new QueueState(state.getOffset(), state.getRetries(), state.getMaxRetries(), state.isEditable() ? j -> {
            sendClearCommand(str, new AgentId(str2), j);
        } : null);
    }

    private void sendClearCommand(String str, AgentId agentId, long j) {
        ClearCommand build = ClearCommand.builder().pubAgentName(str).subSlingId(agentId.getSlingId()).subAgentName(agentId.getAgentName()).offset(j).build();
        this.log.info("Sending clear command {}", build);
        this.commandSender.accept(build);
    }

    @Override // org.apache.sling.distribution.journal.queue.CacheCallback
    public Set<String> getSubscribedAgentIds(String str) {
        return this.discoveryService.getTopologyView().getSubscribedAgentIds(str);
    }
}
