package org.apache.sling.distribution.journal.impl.publisher;

import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
import org.apache.sling.distribution.journal.shared.Strings;
import org.apache.sling.distribution.journal.shared.Timed;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.condition.Condition;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;

@ParametersAreNonnullByDefault
@Designate(ocd = PublisherConfiguration.class, factory = true)
@Component(immediate = true, configurationPid = {DistributionPublisher.FACTORY_PID})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.class */
public class DistributionPublisher implements DistributionAgent {
    public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";

    @Nonnull
    private final DefaultDistributionLog distLog;
    private final DistributionPackageBuilder packageBuilder;
    private final PackageMessageFactory factory;
    private final EventAdmin eventAdmin;
    private final PublishMetrics publishMetrics;
    private final PubQueueProvider pubQueueProvider;
    private final String pubAgentName;
    private final String pkgType;
    private final boolean limitEnabled;
    private final long queuedTimeout;
    private final int queueSizeLimit;
    private final int maxQueueSizeDelay;
    private final Consumer<PackageMessage> sender;
    private final DistributionLogEventListener distributionLogEventListener;

    @Activate
    public DistributionPublisher(@Reference MessagingProvider messagingProvider, @Reference(name = "packageBuilder") DistributionPackageBuilder distributionPackageBuilder, @Reference DiscoveryService discoveryService, @Reference PackageMessageFactory packageMessageFactory, @Reference EventAdmin eventAdmin, @Reference Topics topics, @Reference MetricsService metricsService, @Reference PubQueueProvider pubQueueProvider, @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", cardinality = ReferenceCardinality.OPTIONAL, policyOption = ReferencePolicyOption.GREEDY) Condition condition, PublisherConfiguration publisherConfiguration, BundleContext bundleContext) {
        this.pubAgentName = Strings.requireNotBlank(publisherConfiguration.name());
        this.packageBuilder = distributionPackageBuilder;
        this.factory = (PackageMessageFactory) Objects.requireNonNull(packageMessageFactory);
        this.eventAdmin = eventAdmin;
        Objects.requireNonNull(metricsService);
        this.publishMetrics = new PublishMetrics(metricsService, this.pubAgentName);
        this.pubQueueProvider = pubQueueProvider;
        this.publishMetrics.queueSize(() -> {
            return Integer.valueOf(pubQueueProvider.getMaxQueueSize(this.pubAgentName));
        });
        this.distLog = new DefaultDistributionLog(this.pubAgentName, getClass(), DefaultDistributionLog.LogLevel.INFO);
        this.distributionLogEventListener = new DistributionLogEventListener(bundleContext, this.distLog, this.pubAgentName);
        this.limitEnabled = condition != null;
        this.queuedTimeout = publisherConfiguration.queuedTimeout();
        this.queueSizeLimit = publisherConfiguration.queueSizeLimit();
        this.maxQueueSizeDelay = publisherConfiguration.maxQueueSizeDelay();
        this.pkgType = distributionPackageBuilder.getType();
        this.sender = messagingProvider.createSender(topics.getPackageTopic());
        this.publishMetrics.subscriberCount(() -> {
            return Integer.valueOf(discoveryService.getSubscriberCount(this.pubAgentName));
        });
        this.distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}", this.pubAgentName, this.pkgType, Boolean.valueOf(this.limitEnabled), Long.valueOf(this.queuedTimeout), Integer.valueOf(this.queueSizeLimit), Integer.valueOf(this.maxQueueSizeDelay));
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly(this.distributionLogEventListener);
        this.distLog.info(String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, Long.valueOf(this.queuedTimeout)), new Object[0]);
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return Collections.unmodifiableCollection(this.pubQueueProvider.getQueueNames(this.pubAgentName));
    }

    public DistributionQueue getQueue(String str) {
        try {
            DistributionQueue queue = this.pubQueueProvider.getQueue(this.pubAgentName, str);
            if (queue == null) {
                this.publishMetrics.getQueueAccessErrorCount().increment();
            }
            return queue;
        } catch (Exception e) {
            this.publishMetrics.getQueueAccessErrorCount().increment();
            throw e;
        }
    }

    @Nonnull
    public DistributionLog getLog() {
        return this.distLog;
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest distributionRequest) throws DistributionException {
        if (distributionRequest.getRequestType() == DistributionRequestType.PULL) {
            this.distLog.info("Request requestType=PULL not supported by this agent", new Object[0]);
            return new SimpleDistributionResponse(DistributionRequestState.DROPPED, "Request requestType=PULL not supported by this agent");
        }
        int maxQueueSize = this.pubQueueProvider.getMaxQueueSize(this.pubAgentName);
        int sleepTime = getSleepTime(maxQueueSize);
        sleep(sleepTime);
        return send(buildPackage(resourceResolver, distributionRequest), maxQueueSize, sleepTime);
    }

    int getSleepTime(int i) {
        if (!this.limitEnabled || i <= this.queueSizeLimit) {
            return 0;
        }
        return i >= this.queueSizeLimit * 2 ? this.maxQueueSizeDelay : ((i - this.queueSizeLimit) * this.maxQueueSizeDelay) / this.queueSizeLimit;
    }

    private void sleep(long j) throws DistributionException {
        if (j <= 0) {
            return;
        }
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DistributionException("Interrupted");
        }
    }

    private PackageMessage buildPackage(ResourceResolver resourceResolver, DistributionRequest distributionRequest) throws DistributionException {
        try {
            if (distributionRequest.getRequestType() == DistributionRequestType.TEST || distributionRequest.getPaths().length != 0) {
                return (PackageMessage) Timed.timed(this.publishMetrics.getBuildPackageDuration(), () -> {
                    return this.factory.create(this.packageBuilder, resourceResolver, this.pubAgentName, distributionRequest);
                });
            }
            throw new DistributionException("Empty paths are not allowed");
        } catch (Exception e) {
            this.publishMetrics.getDroppedRequests().mark();
            String format = String.format("Failed to create content package for requestType=%s, paths=%s. Error=%s", distributionRequest.getRequestType(), Arrays.toString(distributionRequest.getPaths()), e.getMessage());
            this.distLog.error(format, e);
            throw new DistributionException(format, e);
        }
    }

    @Nonnull
    private DistributionResponse send(PackageMessage packageMessage, int i, int i2) throws DistributionException {
        try {
            long longValue = ((Long) Timed.timed(this.publishMetrics.getEnqueuePackageDuration(), () -> {
                return Long.valueOf(sendAndWait(packageMessage));
            })).longValue();
            this.publishMetrics.getExportedPackageSize().update(packageMessage.getPkgLength());
            this.publishMetrics.getAcceptedRequests().mark();
            String format = String.format("Request accepted with distribution package %s at offset=%d, queueSize=%d, queueSizeDelay=%d", packageMessage, Long.valueOf(longValue), Integer.valueOf(i), Integer.valueOf(i2));
            this.distLog.info(format, new Object[0]);
            DistributionRequestState distributionRequestState = DistributionRequestState.ACCEPTED;
            Objects.requireNonNull(packageMessage);
            return new SimpleDistributionResponse(distributionRequestState, format, packageMessage::getPkgId);
        } catch (Throwable th) {
            this.publishMetrics.getDroppedRequests().mark();
            String format2 = String.format("Failed to append distribution package %s to the journal", packageMessage);
            this.distLog.error(format2, th);
            if (th instanceof Error) {
                throw ((Error) th);
            }
            throw new DistributionException(format2, th);
        }
    }

    private long sendAndWait(PackageMessage packageMessage) {
        if (packageMessage.getReqType() == PackageMessage.ReqType.TEST) {
            this.sender.accept(packageMessage);
            return -1L;
        }
        PackageQueuedNotifier queuedNotifier = this.pubQueueProvider.getQueuedNotifier();
        try {
            CompletableFuture<Long> registerWait = queuedNotifier.registerWait(packageMessage.getPkgId());
            this.eventAdmin.postEvent(DistributionEvent.eventPackageCreated(packageMessage, this.pubAgentName));
            this.sender.accept(packageMessage);
            return registerWait.get(this.queuedTimeout, TimeUnit.MILLISECONDS).longValue();
        } catch (Exception e) {
            queuedNotifier.unRegisterWait(packageMessage.getPkgId());
            throw new RuntimeException(e);
        }
    }
}
