package com.google.scp.operator.shared.dao.jobqueue.gcp;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.inject.BindingAnnotation;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.logging.type.LogSeverity;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.scp.operator.protos.shared.backend.JobKeyProto;
import com.google.scp.operator.protos.shared.backend.JobMessageProto;
import com.google.scp.operator.protos.shared.backend.jobqueue.JobQueueProto;
import com.google.scp.operator.shared.dao.jobqueue.common.Constants;
import com.google.scp.operator.shared.dao.jobqueue.common.JobQueue;
import com.google.scp.operator.shared.model.BackendModelUtil;
import com.google.scp.shared.proto.ProtoUtil;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/scp/operator/shared/dao/jobqueue/gcp/PubSubJobQueue.class */
public final class PubSubJobQueue implements JobQueue {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) PubSubJobQueue.class);
    private static final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
    private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser().ignoringUnknownFields();
    private static final int MAX_NUMBER_OF_MESSAGES_RECEIVED = 1;
    private final SubscriberStub subscriber;
    private final Publisher publisher;
    private final Provider<String> subscriptionName;
    private final int messageLeaseSeconds;

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:com/google/scp/operator/shared/dao/jobqueue/gcp/PubSubJobQueue$JobQueuePubSubMaxMessageSizeBytes.class */
    public @interface JobQueuePubSubMaxMessageSizeBytes {
    }

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:com/google/scp/operator/shared/dao/jobqueue/gcp/PubSubJobQueue$JobQueuePubSubSubscriptionName.class */
    public @interface JobQueuePubSubSubscriptionName {
    }

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:com/google/scp/operator/shared/dao/jobqueue/gcp/PubSubJobQueue$JobQueuePubSubTopicId.class */
    public @interface JobQueuePubSubTopicId {
    }

    @Inject
    PubSubJobQueue(Publisher publisher, SubscriberStub subscriberStub, @JobQueuePubSubSubscriptionName Provider<String> provider, @JobQueue.JobQueueMessageLeaseSeconds int i) {
        this.publisher = publisher;
        this.subscriber = subscriberStub;
        this.subscriptionName = provider;
        this.messageLeaseSeconds = i;
    }

    @Override // com.google.scp.operator.shared.dao.jobqueue.common.JobQueue
    public void sendJob(JobKeyProto.JobKey jobKey, String str) throws JobQueue.JobQueueException {
        try {
            logger.info(String.format("Job '%s' was successfully added to job queue with message ID '%s'.", BackendModelUtil.toJobKeyString(jobKey), Optional.of(this.publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(JSON_PRINTER.print(JobMessageProto.JobMessage.newBuilder().setJobRequestId(jobKey.getJobRequestId()).setServerJobId(str).build()))).putAttributes(Constants.MESSAGE_BODY_TYPE, "JSON").build()).get())));
        } catch (ApiException | InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
            throw new JobQueue.JobQueueException(e);
        }
    }

    @Override // com.google.scp.operator.shared.dao.jobqueue.common.JobQueue
    public Optional<JobQueueProto.JobQueueItem> receiveJob() throws JobQueue.JobQueueException {
        try {
            PullResponse call = this.subscriber.pullCallable().call(PullRequest.newBuilder().setMaxMessages(1).setSubscription(this.subscriptionName.get()).build());
            call.getReceivedMessagesList().stream().forEach(receivedMessage -> {
                this.subscriber.modifyAckDeadlineCallable().call(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscriptionName.get()).addAckIds(receivedMessage.getAckId()).setAckDeadlineSeconds(Math.min(LogSeverity.CRITICAL_VALUE, this.messageLeaseSeconds)).build());
            });
            Optional<ReceivedMessage> findFirst = call.getReceivedMessagesList().stream().findFirst();
            Optional<JobQueueProto.JobQueueItem> empty = Optional.empty();
            if (findFirst.isPresent()) {
                empty = buildJobQueueItem(findFirst.get());
            }
            if (empty.isPresent()) {
                logger.info("Received job from queue:" + empty.get().getJobKeyString());
            } else {
                logger.info("No job received from queue");
            }
            return empty;
        } catch (ApiException | InvalidProtocolBufferException e) {
            throw new JobQueue.JobQueueException(e);
        }
    }

    @Override // com.google.scp.operator.shared.dao.jobqueue.common.JobQueue
    public void acknowledgeJobCompletion(JobQueueProto.JobQueueItem jobQueueItem) throws JobQueue.JobQueueException {
        try {
            this.subscriber.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().setSubscription(this.subscriptionName.get()).addAckIds(jobQueueItem.getReceiptInfo()).build());
        } catch (ApiException e) {
            throw new JobQueue.JobQueueException(e);
        }
    }

    @Override // com.google.scp.operator.shared.dao.jobqueue.common.JobQueue
    public void modifyJobProcessingTime(JobQueueProto.JobQueueItem jobQueueItem, Duration duration) throws JobQueue.JobQueueException {
        try {
            this.subscriber.modifyAckDeadlineCallable().call(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscriptionName.get()).setAckDeadlineSeconds((int) duration.toSeconds()).addAckIds(jobQueueItem.getReceiptInfo()).build());
        } catch (ApiException e) {
            throw new JobQueue.JobQueueException(e);
        }
    }

    private Optional<JobQueueProto.JobQueueItem> buildJobQueueItem(ReceivedMessage receivedMessage) throws InvalidProtocolBufferException {
        logger.info("Received job message body: " + receivedMessage.getMessage().getData().toStringUtf8());
        JobMessageProto.JobMessage.Builder newBuilder = JobMessageProto.JobMessage.newBuilder();
        JSON_PARSER.merge(receivedMessage.getMessage().getData().toStringUtf8(), newBuilder);
        JobMessageProto.JobMessage build = newBuilder.build();
        return Optional.of(JobQueueProto.JobQueueItem.newBuilder().setJobKeyString(build.getJobRequestId()).setServerJobId(build.getServerJobId()).setJobProcessingTimeout(Durations.fromSeconds(this.messageLeaseSeconds)).setJobProcessingStartTime(ProtoUtil.toProtoTimestamp(Instant.now())).setReceiptInfo(receivedMessage.getAckId()).build());
    }
}
