package com.google.aggregate.adtech.worker.aggregation.domain;

import com.google.aggregate.adtech.worker.aggregation.engine.AggregationEngine;
import com.google.aggregate.adtech.worker.exceptions.DomainReadException;
import com.google.aggregate.adtech.worker.model.AggregatedFact;
import com.google.aggregate.adtech.worker.model.DebugBucketAnnotation;
import com.google.aggregate.adtech.worker.model.serdes.AvroDebugResultsSerdes;
import com.google.aggregate.adtech.worker.model.serdes.AvroResultsSerdes;
import com.google.aggregate.adtech.worker.util.OutputShardFileHelper;
import com.google.aggregate.privacy.noise.NoiseApplier;
import com.google.aggregate.privacy.noise.NoisedAggregationRunner;
import com.google.aggregate.privacy.noise.model.AggregatedResults;
import com.google.aggregate.privacy.noise.model.NoisedAggregatedResultSet;
import com.google.aggregate.privacy.noise.model.NoisedAggregationResult;
import com.google.aggregate.privacy.noise.model.SummaryReportAvro;
import com.google.aggregate.privacy.noise.model.SummaryReportAvroSet;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.scp.operator.cpio.blobstorageclient.BlobStorageClient;
import com.google.scp.operator.cpio.blobstorageclient.model.DataLocation;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.InputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/aggregate/adtech/worker/aggregation/domain/OutputDomainProcessor.class */
public abstract class OutputDomainProcessor {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) OutputDomainProcessor.class);
    private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
    private static final int NUM_READ_THREADS = NUM_CPUS;
    private static final int NUM_PROCESS_THREADS = NUM_CPUS;
    private final int MAX_DOMAIN_READ_BUFFER_SIZE = 10000;
    private final int MAX_DOMAIN_PROCESS_BUFFER_SIZE = (10000 * NUM_READ_THREADS) / NUM_PROCESS_THREADS;
    private final ListeningExecutorService blockingThreadPool;
    private final ListeningExecutorService nonBlockingThreadPool;
    private final BlobStorageClient blobStorageClient;
    private final Boolean domainOptional;
    private final Boolean enableThresholding;
    private final AvroResultsSerdes resultsSerdes;
    private final AvroDebugResultsSerdes debugResultsSerdes;

    /* JADX INFO: Access modifiers changed from: package-private */
    public OutputDomainProcessor(ListeningExecutorService listeningExecutorService, ListeningExecutorService listeningExecutorService2, BlobStorageClient blobStorageClient, AvroResultsSerdes avroResultsSerdes, AvroDebugResultsSerdes avroDebugResultsSerdes, Boolean bool, Boolean bool2) {
        this.blockingThreadPool = listeningExecutorService;
        this.nonBlockingThreadPool = listeningExecutorService2;
        this.blobStorageClient = blobStorageClient;
        this.domainOptional = bool;
        this.enableThresholding = bool2;
        this.resultsSerdes = avroResultsSerdes;
        this.debugResultsSerdes = avroDebugResultsSerdes;
    }

    public ImmutableList<DataLocation> listShards(DataLocation dataLocation) {
        try {
            ImmutableList<String> listBlobs = this.blobStorageClient.listBlobs(dataLocation);
            logger.info("Output domain shards detected by blob storage client: " + String.valueOf(listBlobs));
            DataLocation.BlobStoreDataLocation blobStoreDataLocation = dataLocation.blobStoreDataLocation();
            ImmutableList<DataLocation> immutableList = (ImmutableList) listBlobs.stream().map(str -> {
                return DataLocation.BlobStoreDataLocation.create(blobStoreDataLocation.bucket(), str);
            }).map(DataLocation::ofBlobStoreDataLocation).collect(ImmutableList.toImmutableList());
            logger.info("Output domain shards to be used: " + String.valueOf(immutableList));
            return immutableList;
        } catch (BlobStorageClient.BlobStorageClientException e) {
            throw new DomainReadException(e);
        }
    }

    public AggregatedResults adjustAggregationWithDomainAndNoiseStreaming(AggregationEngine aggregationEngine, Optional<DataLocation> optional, ImmutableList<DataLocation> immutableList, NoisedAggregationRunner noisedAggregationRunner, Optional<Double> optional2, Boolean bool) throws DomainReadException {
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Supplier<NoiseApplier> requestScopedNoiseApplier = noisedAggregationRunner.getRequestScopedNoiseApplier(optional2);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        Flowable.fromStream(immutableList.stream()).flatMap(dataLocation -> {
            return processOutputDomainShard(dataLocation, aggregationEngine, noisedAggregationRunner, requestScopedNoiseApplier, newConcurrentHashSet, bool);
        }, false, NUM_READ_THREADS, 10000).buffer(OutputShardFileHelper.getMaxRecordsPerShard()).doOnNext(list -> {
            atomicLong.addAndGet(list.size());
        }).flatMap(list2 -> {
            return processDomainSummaryFacts(ImmutableList.copyOf((Collection) list2), atomicInteger.addAndGet(1), bool, synchronizedList, synchronizedList2);
        }, NUM_PROCESS_THREADS).blockingSubscribe();
        if (optional.isPresent() && atomicLong.get() < 1) {
            throw new DomainReadException(new IllegalArgumentException(String.format("No output domain provided in the location: %s. Please refer to the API documentation for output domain parameters at https://github.com/privacysandbox/aggregation-service/blob/main/docs/api.md", optional)));
        }
        if (bool.booleanValue() || this.domainOptional.booleanValue()) {
            Flowable.fromIterable(aggregationEngine.getEntries()).map(entry -> {
                AggregatedFact create = AggregatedFact.create((BigInteger) entry.getKey(), ((LongAdder) entry.getValue()).longValue());
                noisedAggregationRunner.noiseSingleFact(create, requestScopedNoiseApplier);
                if (bool.booleanValue()) {
                    create.setDebugAnnotations(List.of(DebugBucketAnnotation.IN_REPORTS));
                }
                return create;
            }).buffer(OutputShardFileHelper.getMaxRecordsPerShard()).flatMap(list3 -> {
                return processReportOnlyFacts(ImmutableList.copyOf((Collection) list3), Integer.valueOf(atomicInteger.addAndGet(1)), bool.booleanValue(), optional2, noisedAggregationRunner, synchronizedList, synchronizedList2);
            }).blockingSubscribe();
        }
        return AggregatedResults.create(SummaryReportAvroSet.create(ImmutableList.copyOf((Collection) synchronizedList), bool.booleanValue() ? Optional.of(ImmutableList.copyOf((Collection) synchronizedList2)) : Optional.empty()));
    }

    private Flowable<Object> processReportOnlyFacts(ImmutableList<AggregatedFact> immutableList, Integer num, boolean z, Optional<Double> optional, NoisedAggregationRunner noisedAggregationRunner, List<SummaryReportAvro> list, List<SummaryReportAvro> list2) {
        if (this.domainOptional.booleanValue()) {
            list.add(SummaryReportAvro.create(num, this.resultsSerdes.convert(this.enableThresholding.booleanValue() ? noisedAggregationRunner.thresholdAggregatedFacts(immutableList, optional) : immutableList)));
        }
        if (z) {
            list2.add(SummaryReportAvro.create(num, this.debugResultsSerdes.convert(immutableList)));
        }
        return Flowable.empty();
    }

    private Flowable<Object> processDomainSummaryFacts(ImmutableList<AggregatedFact> immutableList, int i, Boolean bool, List<SummaryReportAvro> list, List<SummaryReportAvro> list2) {
        list.add(SummaryReportAvro.create(Integer.valueOf(i), this.resultsSerdes.convert(immutableList)));
        if (bool.booleanValue()) {
            list2.add(SummaryReportAvro.create(Integer.valueOf(i), this.debugResultsSerdes.convert(immutableList)));
        }
        return Flowable.empty();
    }

    private Flowable<AggregatedFact> processOutputDomainShard(DataLocation dataLocation, AggregationEngine aggregationEngine, NoisedAggregationRunner noisedAggregationRunner, Supplier<NoiseApplier> supplier, Set<BigInteger> set, Boolean bool) {
        Flowable<BigInteger> readShardData = readShardData(dataLocation);
        Objects.requireNonNull(set);
        return readShardData.filter((v1) -> {
            return r1.add(v1);
        }).map(bigInteger -> {
            AggregatedFact create = AggregatedFact.create(bigInteger, aggregationEngine.getAggregatedValueOrDefault(bigInteger, 0L));
            noisedAggregationRunner.noiseSingleFact(create, supplier);
            if (bool.booleanValue()) {
                ArrayList arrayList = new ArrayList();
                if (aggregationEngine.containsKey(bigInteger)) {
                    arrayList.add(DebugBucketAnnotation.IN_REPORTS);
                }
                arrayList.add(DebugBucketAnnotation.IN_DOMAIN);
                create.setDebugAnnotations(arrayList);
            }
            aggregationEngine.remove(bigInteger);
            return create;
        }).subscribeOn(Schedulers.from(this.blockingThreadPool));
    }

    public AggregatedResults adjustAggregationWithDomainAndNoise(AggregationEngine aggregationEngine, Optional<DataLocation> optional, ImmutableList<DataLocation> immutableList, NoisedAggregationRunner noisedAggregationRunner, Optional<Double> optional2, Boolean bool) throws DomainReadException {
        Set newConcurrentHashSet = Sets.newConcurrentHashSet(aggregationEngine.getKeySet());
        Set newConcurrentHashSet2 = Sets.newConcurrentHashSet();
        AtomicLong atomicLong = new AtomicLong(0L);
        Flowable.fromStream(immutableList.stream()).flatMap(dataLocation -> {
            return readShardData(dataLocation).subscribeOn(Schedulers.from(this.blockingThreadPool));
        }, false, NUM_READ_THREADS, 10000).buffer(this.MAX_DOMAIN_PROCESS_BUFFER_SIZE).doOnNext(list -> {
            atomicLong.addAndGet(list.size());
        }).flatMap(list2 -> {
            return Flowable.just(list2).subscribeOn(Schedulers.from(this.nonBlockingThreadPool)).map(list2 -> {
                list2.forEach(bigInteger -> {
                    if (bool.booleanValue() && newConcurrentHashSet.contains(bigInteger)) {
                        newConcurrentHashSet2.add(bigInteger);
                    }
                    newConcurrentHashSet.remove(bigInteger);
                    aggregationEngine.accept(bigInteger);
                });
                return Observable.empty();
            });
        }, NUM_PROCESS_THREADS).blockingSubscribe();
        if (optional.isPresent() && atomicLong.get() < 1) {
            throw new DomainReadException(new IllegalArgumentException(String.format("No output domain provided in the location: %s. Please refer to the API documentation for output domain parameters at https://github.com/privacysandbox/aggregation-service/blob/main/docs/api.md", optional)));
        }
        HashMap hashMap = new HashMap(aggregationEngine.makeAggregation());
        Stream stream = newConcurrentHashSet.stream();
        Objects.requireNonNull(hashMap);
        List list3 = (List) stream.map((v1) -> {
            return r1.remove(v1);
        }).collect(Collectors.toList());
        NoisedAggregationResult noise = noisedAggregationRunner.noise(hashMap.values(), optional2);
        NoisedAggregatedResultSet.Builder noisedResult = NoisedAggregatedResultSet.builder().setNoisedResult(noise);
        if (!bool.booleanValue() && !this.domainOptional.booleanValue()) {
            return AggregatedResults.create(noisedResult.build());
        }
        NoisedAggregationResult noise2 = noisedAggregationRunner.noise(list3, optional2);
        if (this.domainOptional.booleanValue()) {
            noisedResult.setNoisedResult(NoisedAggregationResult.merge(noise, this.enableThresholding.booleanValue() ? noisedAggregationRunner.threshold(noise2.noisedAggregatedFacts(), optional2) : noise2));
        }
        if (bool.booleanValue()) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            noise.noisedAggregatedFacts().forEach(aggregatedFact -> {
                if (newConcurrentHashSet2.contains(aggregatedFact.getBucket())) {
                    arrayList2.add(aggregatedFact);
                } else {
                    arrayList.add(aggregatedFact);
                }
            });
            noisedResult.setNoisedDebugResult(getAnnotatedDebugResults(noise2, NoisedAggregationResult.create(noise.privacyParameters(), ImmutableList.copyOf((Collection) arrayList)), NoisedAggregationResult.create(noise.privacyParameters(), ImmutableList.copyOf((Collection) arrayList2))));
        }
        return AggregatedResults.create(noisedResult.build());
    }

    private NoisedAggregationResult getAnnotatedDebugResults(NoisedAggregationResult noisedAggregationResult, NoisedAggregationResult noisedAggregationResult2, NoisedAggregationResult noisedAggregationResult3) {
        return NoisedAggregationResult.merge(NoisedAggregationResult.addDebugAnnotations(noisedAggregationResult3, List.of(DebugBucketAnnotation.IN_REPORTS, DebugBucketAnnotation.IN_DOMAIN)), NoisedAggregationResult.merge(NoisedAggregationResult.addDebugAnnotations(noisedAggregationResult, List.of(DebugBucketAnnotation.IN_REPORTS)), NoisedAggregationResult.addDebugAnnotations(noisedAggregationResult2, List.of(DebugBucketAnnotation.IN_DOMAIN))));
    }

    private Flowable<BigInteger> readShardData(DataLocation dataLocation) {
        return Flowable.using(() -> {
            try {
                return this.blobStorageClient.getBlobSize(dataLocation).longValue() <= 0 ? InputStream.nullInputStream() : this.blobStorageClient.getBlob(dataLocation);
            } catch (BlobStorageClient.BlobStorageClientException e) {
                throw new DomainReadException(e);
            }
        }, inputStream -> {
            return Flowable.fromStream(readInputStream(inputStream));
        }, (v0) -> {
            v0.close();
        });
    }

    public abstract Stream<BigInteger> readInputStream(InputStream inputStream);
}
