package org.apache.sling.distribution.journal.impl.discovery;

import java.io.Closeable;
import java.util.Collections;
import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.SubscriberConfig;
import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.apache.sling.distribution.journal.shared.AgentId;
import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.shared.Topics;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(service = {DiscoveryService.class})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.class */
public class DiscoveryService implements Runnable {
    public static final String KEY_MESSAGE = "message";
    public static final String TOPIC_DISTRIBUTION_LOG = "org/apache/sling/distribution/journal/log";
    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);
    private static final long REFRESH_TTL_MS = 30000;

    @Reference
    private PublisherConfigurationAvailable publisherConfigurationAvailable;

    @Reference
    private MessagingProvider messagingProvider;

    @Reference
    private Topics topics;

    @Reference(policyOption = ReferencePolicyOption.GREEDY, cardinality = ReferenceCardinality.OPTIONAL)
    private volatile TopologyChangeHandler topologyChangeHandler;

    @Reference
    private EventAdmin eventAdmin;
    private volatile ServiceRegistration<?> reg;
    private final TopologyViewManager viewManager = new TopologyViewManager(REFRESH_TTL_MS);
    private Closeable poller;

    public DiscoveryService() {
    }

    public DiscoveryService(MessagingProvider messagingProvider, TopologyChangeHandler topologyChangeHandler, Topics topics, EventAdmin eventAdmin) {
        this.messagingProvider = messagingProvider;
        this.topologyChangeHandler = topologyChangeHandler;
        this.topics = topics;
        this.eventAdmin = eventAdmin;
    }

    @Activate
    public void activate(BundleContext bundleContext) {
        this.poller = this.messagingProvider.createPoller(this.topics.getDiscoveryTopic(), Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(DiscoveryMessage.class, this::handleDiscovery), HandlerAdapter.create(LogMessage.class, this::handleLog)});
        startTopologyViewUpdaterTask(bundleContext);
        LOG.info("Discovery service started");
    }

    @Deactivate
    public void deactivate() {
        if (this.reg != null) {
            this.reg.unregister();
        }
        IOUtils.closeQuietly(this.poller);
        LOG.info("Discovery service stopped");
    }

    public TopologyView getTopologyView() {
        return this.viewManager.getCurrentView();
    }

    public int getSubscriberCount(String str) {
        return getTopologyView().getSubscribedAgentIds(str).size();
    }

    @Override // java.lang.Runnable
    public void run() {
        handleChanges(this.viewManager.getCurrentView(), this.viewManager.updateView());
    }

    private void handleChanges(TopologyView topologyView, TopologyView topologyView2) {
        if (topologyView.equals(topologyView2)) {
            return;
        }
        String format = String.format("TopologyView changed from %s to %s", topologyView2, topologyView);
        TopologyViewDiff topologyViewDiff = new TopologyViewDiff(topologyView2, topologyView);
        if (topologyViewDiff.subscribedAgentsChanged()) {
            LOG.info(format);
        } else {
            LOG.debug(format);
        }
        TopologyChangeHandler topologyChangeHandler = this.topologyChangeHandler;
        if (topologyChangeHandler != null) {
            topologyChangeHandler.changed(topologyViewDiff);
        }
    }

    private void startTopologyViewUpdaterTask(BundleContext bundleContext) {
        Hashtable hashtable = new Hashtable();
        hashtable.put("scheduler.concurrent", false);
        hashtable.put("scheduler.period", 5L);
        this.reg = bundleContext.registerService(Runnable.class.getName(), this, hashtable);
    }

    public void handleDiscovery(MessageInfo messageInfo, DiscoveryMessage discoveryMessage) {
        long currentTimeMillis = System.currentTimeMillis();
        AgentId agentId = new AgentId(discoveryMessage.getSubSlingId(), discoveryMessage.getSubAgentName());
        for (SubscriberState subscriberState : discoveryMessage.getSubscriberStates()) {
            SubscriberConfig subscriberConfiguration = discoveryMessage.getSubscriberConfiguration();
            this.viewManager.refreshState(new State(subscriberState.getPubAgentName(), agentId.getAgentId(), currentTimeMillis, subscriberState.getOffset(), subscriberState.getRetries(), subscriberConfiguration.getMaxRetries(), subscriberConfiguration.isEditable()));
        }
    }

    public void handleLog(MessageInfo messageInfo, LogMessage logMessage) {
        this.eventAdmin.postEvent(new Event(TOPIC_DISTRIBUTION_LOG, Collections.singletonMap(KEY_MESSAGE, logMessage)));
    }
}
