package com.google.scp.operator.cpio.jobclient;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.scp.operator.cpio.jobclient.JobClient;
import com.google.scp.operator.cpio.jobclient.JobHandlerModule;
import com.google.scp.operator.cpio.jobclient.model.ErrorReason;
import com.google.scp.operator.cpio.jobclient.model.Job;
import com.google.scp.operator.cpio.jobclient.model.JobResult;
import com.google.scp.operator.cpio.jobclient.model.JobRetryRequest;
import com.google.scp.operator.cpio.lifecycleclient.LifecycleClient;
import com.google.scp.operator.cpio.metricclient.MetricClient;
import com.google.scp.operator.cpio.metricclient.model.CustomMetric;
import com.google.scp.operator.cpio.notificationclient.NotificationClient;
import com.google.scp.operator.cpio.notificationclient.model.PublishMessageRequest;
import com.google.scp.operator.protos.shared.backend.CreateJobRequestProto;
import com.google.scp.operator.protos.shared.backend.JobKeyProto;
import com.google.scp.operator.protos.shared.backend.JobStatusProto;
import com.google.scp.operator.protos.shared.backend.RequestInfoProto;
import com.google.scp.operator.protos.shared.backend.ReturnCodeProto;
import com.google.scp.operator.protos.shared.backend.jobqueue.JobQueueProto;
import com.google.scp.operator.protos.shared.backend.metadatadb.JobMetadataProto;
import com.google.scp.operator.shared.dao.jobqueue.common.JobQueue;
import com.google.scp.operator.shared.dao.metadatadb.common.JobMetadataDb;
import com.google.scp.operator.shared.model.BackendModelUtil;
import com.google.scp.shared.clients.configclient.ParameterClient;
import com.google.scp.shared.clients.configclient.model.WorkerParameter;
import com.google.scp.shared.proto.ProtoUtil;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;

@NotThreadSafe
/* loaded from: input_file:com/google/scp/operator/cpio/jobclient/JobClientImpl.class */
public final class JobClientImpl implements JobClient {
    private static final Logger logger = Logger.getLogger(JobClientImpl.class.getName());
    static final Duration REPORTING_WINDOW_FIXED_LENGTH = Duration.of(1, ChronoUnit.HOURS);
    static final String METRIC_NAMESPACE = "scp/jobclient";
    private final JobQueue jobQueue;
    private final JobMetadataDb metadataDb;
    private final JobPullBackoff pullBackoff;
    ImmutableList<JobValidator> jobValidators;
    private final LifecycleClient lifecycleClient;
    private final MetricClient metricClient;
    private final Optional<NotificationClient> notificationClient;
    private final ParameterClient parameterClient;
    private Clock clock;
    private boolean pollForJob = false;
    private ConcurrentHashMap<String, JobQueueProto.JobQueueItem> cache = new ConcurrentHashMap<>();

    @Inject
    JobClientImpl(JobQueue jobQueue, JobMetadataDb jobMetadataDb, JobPullBackoff jobPullBackoff, @JobHandlerModule.JobClientJobValidatorsBinding ImmutableList<JobValidator> immutableList, MetricClient metricClient, LifecycleClient lifecycleClient, Optional<NotificationClient> optional, ParameterClient parameterClient, Clock clock) {
        this.jobQueue = jobQueue;
        this.metadataDb = jobMetadataDb;
        this.pullBackoff = jobPullBackoff;
        this.jobValidators = immutableList;
        this.metricClient = metricClient;
        this.lifecycleClient = lifecycleClient;
        this.notificationClient = optional;
        this.parameterClient = parameterClient;
        this.clock = clock;
        startJobProcessingExtender();
    }

    @Override // com.google.scp.operator.cpio.jobclient.JobClient
    public Optional<Job> getJob() throws JobClient.JobClientException {
        this.pollForJob = true;
        Optional<JobQueueProto.JobQueueItem> empty = Optional.empty();
        Optional<JobMetadataProto.JobMetadata> empty2 = Optional.empty();
        Optional<Job> empty3 = Optional.empty();
        while (this.pollForJob) {
            try {
                if (this.lifecycleClient.handleScaleInLifecycleAction()) {
                    Thread.sleep(5000L);
                    return Optional.empty();
                }
                empty = this.jobQueue.receiveJob();
                if (empty.isEmpty()) {
                    this.pollForJob = this.pullBackoff.get().booleanValue();
                } else {
                    empty2 = getJobMetadata(empty.get());
                    empty3 = empty2.isPresent() ? Optional.of(buildJob(empty.get(), empty2.get())) : Optional.empty();
                    if (!empty2.isPresent() || empty2.get().getServerJobId().isEmpty() || empty.get().getServerJobId().isEmpty() || empty2.get().getServerJobId().equals(empty.get().getServerJobId())) {
                        Optional<JobValidator> performInitialChecks = performInitialChecks(empty3, empty.get().getJobKeyString());
                        if (performInitialChecks.isPresent()) {
                            if (performInitialChecks.get().reportValidationError()) {
                                reportFailedCheck(empty3.get(), performInitialChecks.get().getValidationErrorMessage(), performInitialChecks.get().getValidationErrorReturnCode());
                            }
                            this.jobQueue.acknowledgeJobCompletion(empty.get());
                            try {
                                this.metricClient.recordMetric(CustomMetric.builder().setNameSpace(METRIC_NAMESPACE).setName("JobValidationFailure").setValue(1.0d).setUnit("Count").addLabel("Validator", performInitialChecks.get().getClass().getSimpleName()).build());
                            } catch (MetricClient.MetricClientException e) {
                                logger.warning(String.format("Could not record JobValidationFailure metric.\n%s", e));
                            }
                        } else if (isDuplicateJob(empty3)) {
                            logger.info("Skip processing for duplicate job: " + String.valueOf(empty3.get().jobKey()));
                        } else {
                            this.pollForJob = false;
                        }
                    } else {
                        logger.info(String.format("Deleting job queue message because of server job id mismatch. Server job id from metadata db: %s. Server job id from job queue: %s.", empty2.get().getServerJobId(), empty.get().getServerJobId()));
                        this.jobQueue.acknowledgeJobCompletion(empty.get());
                    }
                }
            } catch (LifecycleClient.LifecycleClientException | JobQueue.JobQueueException | JobMetadataDb.JobMetadataConflictException | JobMetadataDb.JobMetadataDbException | InterruptedException e2) {
                logger.log(Level.SEVERE, "Failed to pull new job from job queue.", e2);
                recordJobClientError(ErrorReason.JOB_PULL_FAILED);
                throw new JobClient.JobClientException(e2, ErrorReason.JOB_PULL_FAILED);
            }
        }
        if (empty.isEmpty()) {
            return Optional.empty();
        }
        this.metadataDb.updateJobMetadata(empty2.get().toBuilder().setJobStatus(JobStatusProto.JobStatus.IN_PROGRESS).setRequestProcessingStartedAt(ProtoUtil.toProtoTimestamp(Instant.now(this.clock))).setNumAttempts(empty2.get().getNumAttempts() + 1).build());
        this.cache.put(BackendModelUtil.toJobKeyString(empty2.get().getJobKey()), empty.get());
        logger.info(String.format("Successfully pulled a job from sqs and ddb, job_id=%s.", BackendModelUtil.toJobKeyString(empty2.get().getJobKey())));
        return empty3;
    }

    private Optional<JobMetadataProto.JobMetadata> getJobMetadata(JobQueueProto.JobQueueItem jobQueueItem) throws JobMetadataDb.JobMetadataDbException, InterruptedException {
        for (int i = 0; i <= 6; i++) {
            Optional<JobMetadataProto.JobMetadata> jobMetadata = this.metadataDb.getJobMetadata(jobQueueItem.getJobKeyString());
            if (jobMetadata.isPresent()) {
                return jobMetadata;
            }
            if (i < 6) {
                Thread.sleep((long) (1000.0d * Math.pow(2.0d, i + 1)));
            }
        }
        return Optional.empty();
    }

    @Override // com.google.scp.operator.cpio.jobclient.JobClient
    public void returnJobForRetry(JobRetryRequest jobRetryRequest) throws JobClient.JobClientException {
        try {
            if (!this.cache.containsKey(BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey()))) {
                recordJobClientError(ErrorReason.JOB_RECEIPT_HANDLE_NOT_FOUND);
                throw new JobClient.JobClientException(String.format("Job cannot be released. In-memory cache does not contain job key '%s'", BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey())), ErrorReason.JOB_RECEIPT_HANDLE_NOT_FOUND);
            }
            Optional<JobMetadataProto.JobMetadata> jobMetadata = this.metadataDb.getJobMetadata(jobRetryRequest.getJobKey().getJobRequestId());
            if (jobMetadata.isEmpty()) {
                recordJobClientError(ErrorReason.JOB_METADATA_NOT_FOUND);
                throw new JobClient.JobClientException(String.format("Job cannot be released. Metadata entry for job '%s' was not found.", BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey())), ErrorReason.JOB_METADATA_NOT_FOUND);
            }
            if (jobMetadata.get().getJobStatus() != JobStatusProto.JobStatus.IN_PROGRESS) {
                throw new JobClient.JobClientException(String.format("Job cannot be released. Metadata entry for job '%s' indicates job is in status %s, but expected to be IN_PROGRESS.", BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey()), jobMetadata.get().getJobStatus()), ErrorReason.WRONG_JOB_STATUS);
            }
            if (jobRetryRequest.getDelay().isPresent() && (jobRetryRequest.getDelay().get().isNegative() || jobRetryRequest.getDelay().get().getSeconds() > 600)) {
                recordJobClientError(ErrorReason.JOB_DELAY_OUT_OF_RANGE);
                throw new JobClient.JobClientException(String.format("Job cannot be released. Duration for job %s must be between zero and 10 minutes.", BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey())), ErrorReason.JOB_RECEIPT_HANDLE_NOT_FOUND);
            }
            JobMetadataProto.JobMetadata.Builder builder = jobMetadata.get().toBuilder();
            builder.setJobStatus(JobStatusProto.JobStatus.RECEIVED);
            if (jobRetryRequest.getResultInfo().isPresent()) {
                builder.setResultInfo(jobRetryRequest.getResultInfo().get());
            }
            this.metadataDb.updateJobMetadata(builder.build());
            this.jobQueue.modifyJobProcessingTime(this.cache.get(BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey())), jobRetryRequest.getDelay().isPresent() ? jobRetryRequest.getDelay().get() : Duration.ofSeconds(0L));
            this.cache.remove(BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey()));
            logger.info(String.format("Successfully released job %s back to the queue for retry.", BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey())));
        } catch (JobQueue.JobQueueException | JobMetadataDb.JobMetadataConflictException | JobMetadataDb.JobMetadataDbException e) {
            logger.log(Level.SEVERE, String.format("Failed to release job '%s'.", BackendModelUtil.toJobKeyString(jobRetryRequest.getJobKey())), e);
            recordJobClientError(ErrorReason.RETURN_JOB_FOR_RETRY_FAILED);
            throw new JobClient.JobClientException(e, ErrorReason.RETURN_JOB_FOR_RETRY_FAILED);
        }
    }

    @Override // com.google.scp.operator.cpio.jobclient.JobClient
    public void markJobCompleted(JobResult jobResult) throws JobClient.JobClientException {
        try {
            if (!this.cache.containsKey(BackendModelUtil.toJobKeyString(jobResult.jobKey()))) {
                recordJobClientError(ErrorReason.JOB_RECEIPT_HANDLE_NOT_FOUND);
                throw new JobClient.JobClientException(String.format("In-memory cache does not contain job key '%s'", BackendModelUtil.toJobKeyString(jobResult.jobKey())), ErrorReason.JOB_RECEIPT_HANDLE_NOT_FOUND);
            }
            Optional<JobMetadataProto.JobMetadata> jobMetadata = this.metadataDb.getJobMetadata(jobResult.jobKey().getJobRequestId());
            if (jobMetadata.isEmpty()) {
                recordJobClientError(ErrorReason.JOB_METADATA_NOT_FOUND);
                throw new JobClient.JobClientException(String.format("Metadata entry for job '%s' was not found, cannot mark job as completed.", BackendModelUtil.toJobKeyString(jobResult.jobKey())), ErrorReason.JOB_METADATA_NOT_FOUND);
            }
            if (jobMetadata.get().getJobStatus() != JobStatusProto.JobStatus.IN_PROGRESS) {
                throw new JobClient.JobClientException(String.format("Metadata entry for job '%s' indicates job is in status %s, but expected to be IN_PROGRESS.", BackendModelUtil.toJobKeyString(jobResult.jobKey()), jobMetadata.get().getJobStatus()), ErrorReason.WRONG_JOB_STATUS);
            }
            this.metadataDb.updateJobMetadata(jobMetadata.get().toBuilder().setJobStatus(JobStatusProto.JobStatus.FINISHED).setResultInfo(jobResult.resultInfo()).build());
            this.jobQueue.acknowledgeJobCompletion(this.cache.get(BackendModelUtil.toJobKeyString(jobResult.jobKey())));
            this.cache.remove(BackendModelUtil.toJobKeyString(jobResult.jobKey()));
            try {
                Optional<String> parameter = this.parameterClient.getParameter(WorkerParameter.NOTIFICATIONS_TOPIC_ID.name());
                if (parameter.isPresent() && this.notificationClient.isPresent()) {
                    this.notificationClient.get().publishMessage(PublishMessageRequest.builder().setNotificationTopic(parameter.get()).setMessageBody(String.format("{\"jobId\": \"%s\"}", BackendModelUtil.toJobKeyString(jobResult.jobKey()))).build());
                }
            } catch (NotificationClient.NotificationClientException | ParameterClient.ParameterClientException e) {
                logger.log(Level.INFO, String.format("Failed to publish notification for the completion of jobId '%s'.", BackendModelUtil.toJobKeyString(jobResult.jobKey())), e);
            }
            logger.info(String.format("Successfully marked job %s as completed.", BackendModelUtil.toJobKeyString(jobResult.jobKey())));
        } catch (JobQueue.JobQueueException | JobMetadataDb.JobMetadataConflictException | JobMetadataDb.JobMetadataDbException e2) {
            logger.log(Level.SEVERE, String.format("Failed to mark job '%s' as completed.", BackendModelUtil.toJobKeyString(jobResult.jobKey())), e2);
            recordJobClientError(ErrorReason.JOB_MARK_COMPLETION_FAILED);
            throw new JobClient.JobClientException(e2, ErrorReason.JOB_MARK_COMPLETION_FAILED);
        }
    }

    @Override // com.google.scp.operator.cpio.jobclient.JobClient
    public void appendJobErrorMessage(JobKeyProto.JobKey jobKey, String str) throws JobClient.JobClientException {
        try {
            Optional<JobMetadataProto.JobMetadata> jobMetadata = this.metadataDb.getJobMetadata(jobKey.getJobRequestId());
            if (jobMetadata.isEmpty()) {
                recordJobClientError(ErrorReason.JOB_METADATA_NOT_FOUND);
                throw new JobClient.JobClientException(String.format("Metadata entry for job '%s' was not found, cannot update error summary.", BackendModelUtil.toJobKeyString(jobKey)), ErrorReason.JOB_METADATA_NOT_FOUND);
            }
            if (jobMetadata.get().getJobStatus() != JobStatusProto.JobStatus.IN_PROGRESS) {
                throw new JobClient.JobClientException(String.format("Metadata entry for job '%s' indicates job is in status %s, but expected to be IN_PROGRESS.", BackendModelUtil.toJobKeyString(jobKey), jobMetadata.get().getJobStatus()), ErrorReason.WRONG_JOB_STATUS);
            }
            this.metadataDb.updateJobMetadata(jobMetadata.get().toBuilder().setResultInfo(jobMetadata.get().getResultInfo().toBuilder().setErrorSummary(jobMetadata.get().getResultInfo().getErrorSummary().toBuilder().addErrorMessages(str).build()).setFinishedAt(ProtoUtil.toProtoTimestamp(Instant.now(this.clock))).build()).build());
            logger.info(String.format("Successfully updated error summary for job '%s'", BackendModelUtil.toJobKeyString(jobKey)));
        } catch (JobMetadataDb.JobMetadataConflictException | JobMetadataDb.JobMetadataDbException e) {
            logger.log(Level.SEVERE, String.format("Failed to update error summary for job '%s'", BackendModelUtil.toJobKeyString(jobKey)), e);
            recordJobClientError(ErrorReason.JOB_ERROR_SUMMARY_UPDATE_FAILED);
            throw new JobClient.JobClientException(e, ErrorReason.JOB_ERROR_SUMMARY_UPDATE_FAILED);
        }
    }

    Optional<JobValidator> performInitialChecks(Optional<Job> optional, String str) {
        Optional<JobValidator> findFirst = this.jobValidators.stream().filter(jobValidator -> {
            return !jobValidator.validate(optional, str);
        }).findFirst();
        if (findFirst.isPresent()) {
            logger.warning(String.format("Job '%s' failed validation step '%s'.", str, findFirst.get().getDescription()));
        }
        return findFirst;
    }

    void reportFailedCheck(Job job, String str, ReturnCodeProto.ReturnCode returnCode) throws JobClient.JobClientException {
        try {
            Optional<JobMetadataProto.JobMetadata> jobMetadata = this.metadataDb.getJobMetadata(BackendModelUtil.toJobKeyString(job.jobKey()));
            if (jobMetadata.isPresent()) {
                this.metadataDb.updateJobMetadata(jobMetadata.get().toBuilder().setJobStatus(JobStatusProto.JobStatus.FINISHED).setResultInfo(jobMetadata.get().getResultInfo().toBuilder().setReturnCode(returnCode.name()).setReturnMessage(str).setFinishedAt(ProtoUtil.toProtoTimestamp(Instant.now(this.clock)))).build());
            } else {
                logger.log(Level.SEVERE, String.format("Failed to report failed checks on job '%s' and mark it completed.", BackendModelUtil.toJobKeyString(job.jobKey())));
                recordJobClientError(ErrorReason.JOB_MARK_COMPLETION_FAILED);
                throw new JobClient.JobClientException(String.format("Job %s does not exist in the JobMetadata table", BackendModelUtil.toJobKeyString(job.jobKey())), ErrorReason.JOB_MARK_COMPLETION_FAILED);
            }
        } catch (JobMetadataDb.JobMetadataConflictException | JobMetadataDb.JobMetadataDbException e) {
            logger.log(Level.SEVERE, String.format("Failed to report failed checks on job '%s' and mark it completed.", BackendModelUtil.toJobKeyString(job.jobKey())), e);
            recordJobClientError(ErrorReason.JOB_MARK_COMPLETION_FAILED);
            throw new JobClient.JobClientException(e, ErrorReason.JOB_MARK_COMPLETION_FAILED);
        }
    }

    Job buildJob(JobQueueProto.JobQueueItem jobQueueItem, JobMetadataProto.JobMetadata jobMetadata) throws JobClient.JobClientException {
        Job.Builder jobProcessingTimeout = Job.builder().setJobKey(jobMetadata.getJobKey()).setJobStatus(jobMetadata.getJobStatus()).setCreateTime(ProtoUtil.toJavaInstant(jobMetadata.getRequestReceivedAt())).setUpdateTime(ProtoUtil.toJavaInstant(jobMetadata.getRequestUpdatedAt())).setNumAttempts(Integer.valueOf(jobMetadata.getNumAttempts())).setJobProcessingTimeout(ProtoUtil.toJavaDuration(jobQueueItem.getJobProcessingTimeout()));
        if (jobMetadata.hasRequestProcessingStartedAt()) {
            jobProcessingTimeout.setProcessingStartTime(Optional.of(ProtoUtil.toJavaInstant(jobMetadata.getRequestProcessingStartedAt())));
        }
        if (jobMetadata.hasRequestInfo()) {
            jobProcessingTimeout.setRequestInfo(jobMetadata.getRequestInfo());
        } else {
            if (!jobMetadata.hasCreateJobRequest()) {
                throw new JobClient.JobClientException(String.format("Missing requestInfo and createJobRequest from JobMetadata for job %s.", BackendModelUtil.toJobKeyString(jobMetadata.getJobKey())), ErrorReason.JOB_PULL_FAILED);
            }
            jobProcessingTimeout.setRequestInfo(convertToRequestInfo(jobMetadata.getCreateJobRequest()));
        }
        return jobProcessingTimeout.build();
    }

    private void recordJobClientError(ErrorReason errorReason) {
        try {
            this.metricClient.recordMetric(CustomMetric.builder().setNameSpace(METRIC_NAMESPACE).setName("JobClientError").setValue(1.0d).setUnit("Count").addLabel("ErrorReason", errorReason.toString()).build());
        } catch (Exception e) {
            logger.warning(String.format("Could not record job client metric.\n%s", e));
        }
    }

    private RequestInfoProto.RequestInfo convertToRequestInfo(CreateJobRequestProto.CreateJobRequest createJobRequest) {
        return RequestInfoProto.RequestInfo.newBuilder().setJobRequestId(createJobRequest.getJobRequestId()).putAllJobParameters(createJobRequest.getJobParameters()).setInputDataBucketName(createJobRequest.getInputDataBucketName()).setInputDataBlobPrefix(createJobRequest.getInputDataBlobPrefix()).setOutputDataBucketName(createJobRequest.getOutputDataBucketName()).setOutputDataBlobPrefix(createJobRequest.getOutputDataBlobPrefix()).build();
    }

    private void startJobProcessingExtender() {
        new JobProcessingExtenderService(this.jobQueue, this.cache).startAsync();
    }

    @VisibleForTesting
    boolean isDuplicateJob(Optional<Job> optional) {
        if (optional.get().processingStartTime().isEmpty()) {
            return false;
        }
        Instant instant = optional.get().processingStartTime().get();
        Duration jobProcessingTimeout = optional.get().jobProcessingTimeout();
        Instant now = Instant.now(this.clock);
        logger.info(String.format("Received job %s with status %s. The job started at %s, current time is %s, and job processing timeout is %d seconds.", optional.get().jobKey(), optional.get().jobStatus(), instant, now, Long.valueOf(jobProcessingTimeout.toSeconds())));
        return optional.get().jobStatus() == JobStatusProto.JobStatus.IN_PROGRESS && instant.plus((TemporalAmount) jobProcessingTimeout).isAfter(now);
    }
}
