/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.actionstate;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.configuration.AgentConfigOptions;
import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.ActionStateKafkaSeder;
import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
import org.apache.flink.agents.runtime.actionstate.ActionStateUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaActionStateStore
implements ActionStateStore {
    private static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofMillis(1000L);
    private static final Logger LOG = LoggerFactory.getLogger(KafkaActionStateStore.class);
    private static final Long DEFAULT_FUTURE_GET_TIMEOUT_MS = 100L;
    private final AgentConfiguration agentConfiguration;
    private final Map<String, ActionState> actionStates;
    private final Map<String, Long> latestKeySeqNum;
    private final Producer<String, ActionState> producer;
    private final Consumer<String, ActionState> consumer;
    private final String topic;

    @VisibleForTesting
    KafkaActionStateStore(Map<String, ActionState> actionStates, AgentConfiguration agentConfiguration, Producer<String, ActionState> producer, Consumer<String, ActionState> consumer, String topic) {
        this.actionStates = actionStates;
        this.producer = producer;
        this.consumer = consumer;
        this.topic = topic;
        this.latestKeySeqNum = new HashMap<String, Long>();
        this.agentConfiguration = agentConfiguration;
    }

    public KafkaActionStateStore(AgentConfiguration agentConfiguration) {
        this.actionStates = new HashMap<String, ActionState>();
        this.latestKeySeqNum = new HashMap<String, Long>();
        this.agentConfiguration = agentConfiguration;
        this.topic = (String)Preconditions.checkArgumentNotNull((Object)agentConfiguration.get(AgentConfigOptions.KAFKA_ACTION_STATE_TOPIC), (Object)"Kafka action state topic must be configured");
        this.maybeCreateTopic();
        Properties producerProp = this.createProducerProp();
        this.producer = new KafkaProducer<String, ActionState>(producerProp);
        Properties consumerProp = this.createConsumerProp();
        this.consumer = new KafkaConsumer<String, ActionState>(consumerProp);
        this.consumer.subscribe(Collections.singletonList(this.topic));
        LOG.info("Initialized KafkaActionStateStore with topic: {}", (Object)this.topic);
    }

    @Override
    public void put(Object key, long seqNum, Action action, Event event, ActionState state) throws Exception {
        if (this.producer == null) {
            LOG.error("Producer is null, cannot put action state to Kafka");
            return;
        }
        String stateKey = ActionStateUtil.generateKey(key, seqNum, action, event);
        try {
            ProducerRecord<String, ActionState> kafkaRecord = new ProducerRecord<String, ActionState>(this.topic, stateKey, state);
            this.producer.send(kafkaRecord);
            LOG.debug("Sent action state to Kafka for key: {}", (Object)stateKey);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to send action state to Kafka", e);
        }
    }

    @Override
    public ActionState get(Object key, long seqNum, Action action, Event event) throws Exception {
        String stateKey = ActionStateUtil.generateKey(key, seqNum, action, event);
        boolean hasDivergence = this.checkDivergence(key.toString(), seqNum);
        if (!this.actionStates.containsKey(stateKey) || hasDivergence) {
            this.actionStates.entrySet().removeIf(entry -> {
                try {
                    List<String> parts = ActionStateUtil.parseKey((String)entry.getKey());
                    if (parts.size() >= 2) {
                        long stateSeqNum = Long.parseLong(parts.get(1));
                        return stateSeqNum > seqNum;
                    }
                }
                catch (NumberFormatException e) {
                    LOG.warn("Failed to parse sequence number from state key: {}", (Object)stateKey);
                }
                return false;
            });
        }
        return this.actionStates.get(stateKey);
    }

    private boolean checkDivergence(String key, long seqNum) {
        return this.actionStates.keySet().stream().filter(k -> k.startsWith(key + "_" + seqNum)).count() > 1L;
    }

    @Override
    public void rebuildState(List<Object> recoveryMarkers) {
        LOG.info("Rebuilding state from {} recovery markers", (Object)recoveryMarkers.size());
        try {
            ConsumerRecords<String, ActionState> records;
            HashMap<Integer, Long> partitionMap = new HashMap<Integer, Long>();
            for (Object marker : recoveryMarkers) {
                if (!(marker instanceof Map)) continue;
                Map map = (Map)marker;
                for (Map.Entry entry : map.entrySet()) {
                    Long offset2 = partitionMap.computeIfPresent((Integer)entry.getKey(), (key, value) -> Math.min(value, (Long)entry.getValue()));
                    partitionMap.put((Integer)entry.getKey(), offset2 == null ? (Long)entry.getValue() : offset2);
                }
            }
            partitionMap.forEach((partition, offset) -> this.consumer.seek(new TopicPartition(this.topic, (int)partition), (long)offset));
            while (!(records = this.consumer.poll(CONSUMER_POLL_TIMEOUT)).isEmpty()) {
                for (ConsumerRecord<String, ActionState> consumerRecord : records) {
                    try {
                        this.actionStates.put(consumerRecord.key(), consumerRecord.value());
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to deserialize action state record: {}", (Object)consumerRecord.value().toString(), (Object)e);
                    }
                }
                this.consumer.commitSync();
            }
            LOG.info("Completed rebuilding state, recovered {} states", (Object)this.actionStates.size());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to rebuild state from Kafka", e);
        }
    }

    @Override
    public void pruneState(Object key, long seqNum) {
        LOG.info("Pruning state for key: {} up to sequence number: {}", key, (Object)seqNum);
        this.actionStates.entrySet().removeIf(entry -> {
            String stateKey = (String)entry.getKey();
            if (stateKey.startsWith(key.toString() + "_")) {
                try {
                    List<String> parts = ActionStateUtil.parseKey(stateKey);
                    if (parts.size() >= 2) {
                        long stateSeqNum = Long.parseLong(parts.get(1));
                        return stateSeqNum <= seqNum;
                    }
                }
                catch (NumberFormatException e) {
                    LOG.warn("Failed to parse sequence number from state key: {}", (Object)stateKey);
                }
            }
            return false;
        });
        LOG.debug("Pruned state for key: {} up to sequence number: {}", key, (Object)seqNum);
    }

    @Override
    public Object getRecoveryMarker() {
        HashMap<Integer, Long> recoveryMarker = new HashMap<Integer, Long>();
        try {
            ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
            for (PartitionInfo partitionInfo : this.consumer.partitionsFor(this.topic)) {
                partitions.add(new TopicPartition(this.topic, partitionInfo.partition()));
            }
            Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(partitions);
            for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
                recoveryMarker.put(entry.getKey().partition(), entry.getValue());
            }
        }
        catch (Exception e) {
            LOG.error("Failed to verify Kafka topic: {}", (Object)this.topic, (Object)e);
            throw new RuntimeException("Failed to verify Kafka topic", e);
        }
        return recoveryMarker;
    }

    @Override
    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    private void maybeCreateTopic() {
        try (AdminClient adminClient = AdminClient.create(this.createCommonKafkaConfig());){
            ListTopicsResult topics = adminClient.listTopics();
            if (!topics.names().get(DEFAULT_FUTURE_GET_TIMEOUT_MS, TimeUnit.MILLISECONDS).contains(this.topic)) {
                NewTopic newTopic = new NewTopic(this.topic, this.agentConfiguration.get(AgentConfigOptions.KAFKA_ACTION_STATE_TOPIC_NUM_PARTITIONS), this.agentConfiguration.get(AgentConfigOptions.KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR).shortValue());
                newTopic.configs(Map.of("cleanup.policy", "compact"));
                adminClient.createTopics(List.of(newTopic)).all().get();
                LOG.info("Created Kafka topic: {}", (Object)this.topic);
            } else {
                LOG.info("Kafka topic {} already exists", (Object)this.topic);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to create or verify Kafka topic: {}", (Object)this.topic, (Object)e);
            throw new RuntimeException("Failed to create or verify Kafka topic", e);
        }
    }

    private Properties createCommonKafkaConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.agentConfiguration.get(AgentConfigOptions.KAFKA_BOOTSTRAP_SERVERS));
        return props;
    }

    private Properties createProducerProp() {
        Properties producerProps = new Properties();
        producerProps.putAll((Map<?, ?>)this.createCommonKafkaConfig());
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", ActionStateKafkaSeder.class);
        producerProps.put("acks", "all");
        producerProps.put("partitioner.class", "org.apache.flink.agents.runtime.actionstate.ActionStateKeyPartitioner");
        producerProps.put("retries", (Object)3);
        return producerProps;
    }

    private Properties createConsumerProp() {
        Properties consumerProps = new Properties();
        consumerProps.putAll((Map<?, ?>)this.createCommonKafkaConfig());
        consumerProps.put("client.id", "action-state-rebuild-consumer");
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", ActionStateKafkaSeder.class.getName());
        consumerProps.put("group.id", "action-state-rebuild-" + String.valueOf(UUID.randomUUID()));
        consumerProps.put("auto.offset.reset", "earliest");
        return consumerProps;
    }
}

