package dk.dma.ais.reader;

import dk.dma.ais.binary.SixbitException;
import dk.dma.ais.message.AisMessage;
import dk.dma.ais.message.AisMessageException;
import dk.dma.ais.packet.AisPacket;
import dk.dma.ais.packet.AisPacketReader;
import dk.dma.ais.packet.AisPacketStream;
import dk.dma.ais.queue.BlockingMessageQueue;
import dk.dma.ais.queue.IMessageQueue;
import dk.dma.ais.queue.IQueueEntryHandler;
import dk.dma.ais.queue.MessageQueueOverflowException;
import dk.dma.ais.queue.MessageQueueReader;
import dk.dma.ais.sentence.Abk;
import dk.dma.commons.management.ManagedAttribute;
import dk.dma.commons.management.ManagedResource;
import dk.dma.enav.util.function.Consumer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedResource
/* loaded from: input_file:dk/dma/ais/reader/AisReader.class */
public abstract class AisReader extends Thread {
    static final Logger LOG = LoggerFactory.getLogger(AisReader.class);
    private final AtomicLong bytesRead = new AtomicLong();
    private final AtomicLong bytesWritten = new AtomicLong();
    protected final CopyOnWriteArrayList<Consumer<AisMessage>> handlers = new CopyOnWriteArrayList<>();
    private final AtomicLong linesRead = new AtomicLong();
    protected final CopyOnWriteArrayList<Consumer<? super AisPacket>> packetHandlers = new CopyOnWriteArrayList<>();
    protected final SendThreadPool sendThreadPool = new SendThreadPool();
    final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private String sourceId;

    /* loaded from: input_file:dk/dma/ais/reader/AisReader$Status.class */
    public enum Status {
        CONNECTED,
        DISCONNECTED
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doSend(SendRequest sendRequest, Consumer<Abk> consumer, OutputStream outputStream) throws SendException {
        if (outputStream == null) {
            throw new SendException("Not connected");
        }
        String[] createSentences = sendRequest.createSentences();
        SendThread createSendThread = this.sendThreadPool.createSendThread(sendRequest, consumer);
        String str = StringUtils.join(createSentences, "\r\n") + "\r\n";
        LOG.debug("Sending:\n" + str);
        try {
            outputStream.write(str.getBytes());
            this.bytesWritten.addAndGet(r0.length);
            createSendThread.start();
        } catch (IOException e) {
            throw new SendException("Could not send AIS message: " + e.getMessage());
        }
    }

    @ManagedAttribute
    public long getNumberOfBytesRead() {
        return this.bytesRead.get();
    }

    @ManagedAttribute
    public long getNumberOfBytesWritten() {
        return this.bytesWritten.get();
    }

    @ManagedAttribute
    public long getNumberOfLinesRead() {
        return this.linesRead.get();
    }

    @ManagedAttribute
    public String getSourceId() {
        return this.sourceId;
    }

    @ManagedAttribute
    public abstract Status getStatus();

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isShutdown() {
        return this.shutdownLatch.getCount() == 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void readLoop(InputStream inputStream) throws IOException {
        AisPacketReader aisPacketReader = new AisPacketReader(inputStream) { // from class: dk.dma.ais.reader.AisReader.1
            @Override // dk.dma.ais.packet.AisPacketReader
            protected void handleAbk(Abk abk) {
                AisReader.this.sendThreadPool.handleAbk(abk);
            }
        };
        Throwable th = null;
        while (true) {
            try {
                try {
                    AisPacket readPacket = aisPacketReader.readPacket(this.sourceId);
                    if (readPacket == null) {
                        break;
                    }
                    this.linesRead.incrementAndGet();
                    Iterator<Consumer<? super AisPacket>> it = this.packetHandlers.iterator();
                    while (it.hasNext()) {
                        it.next().accept(readPacket);
                    }
                    if (this.handlers.size() > 0) {
                        AisMessage aisMessage = null;
                        try {
                            aisMessage = readPacket.getAisMessage();
                        } catch (SixbitException e) {
                            LOG.info("Sixbit error: " + e.getMessage() + " vdm: " + readPacket.getVdm().getOrgLinesJoined());
                        } catch (AisMessageException e2) {
                            LOG.info("AIS message exception: " + e2.getMessage() + " vdm: " + readPacket.getVdm().getOrgLinesJoined());
                        }
                        if (aisMessage != null) {
                            Iterator<Consumer<AisMessage>> it2 = this.handlers.iterator();
                            while (it2.hasNext()) {
                                it2.next().accept(aisMessage);
                            }
                        }
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (aisPacketReader != null) {
                    if (th != null) {
                        try {
                            aisPacketReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        aisPacketReader.close();
                    }
                }
                throw th3;
            }
        }
        if (aisPacketReader != null) {
            if (0 == 0) {
                aisPacketReader.close();
                return;
            }
            try {
                aisPacketReader.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public void registerHandler(Consumer<AisMessage> consumer) {
        this.handlers.add(consumer);
    }

    public void registerPacketHandler(Consumer<? super AisPacket> consumer) {
        this.packetHandlers.add(consumer);
    }

    public void registerQueue(final IMessageQueue<AisMessage> iMessageQueue) {
        Objects.requireNonNull(iMessageQueue);
        registerHandler(new Consumer<AisMessage>() { // from class: dk.dma.ais.reader.AisReader.2
            @Override // dk.dma.enav.util.function.Consumer
            public void accept(AisMessage aisMessage) {
                try {
                    iMessageQueue.push(aisMessage);
                } catch (MessageQueueOverflowException e) {
                    AisReader.LOG.error("Message queue overflow, dropping message: " + e.getMessage());
                }
            }
        });
    }

    public void registerQueueHandler(IQueueEntryHandler<AisMessage> iQueueEntryHandler) {
        MessageQueueReader messageQueueReader = new MessageQueueReader(iQueueEntryHandler, new BlockingMessageQueue());
        registerQueue(messageQueueReader.getQueue());
        messageQueueReader.start();
    }

    public Abk send(AisMessage aisMessage, int i, int i2) throws SendException, InterruptedException {
        return send(aisMessage, i, i2, 60000);
    }

    public Abk send(AisMessage aisMessage, int i, int i2, int i3) throws SendException, InterruptedException {
        return new ClientSendThread(this, new SendRequest(aisMessage, i, i2)).send();
    }

    public abstract void send(SendRequest sendRequest, Consumer<Abk> consumer) throws SendException;

    public void setSourceId(String str) {
        this.sourceId = str;
    }

    public void stopReader() {
        this.shutdownLatch.countDown();
        interrupt();
    }

    public AisPacketStream stream() {
        final AisPacketStream newStream = AisPacketStream.newStream();
        registerPacketHandler(new Consumer<AisPacket>() { // from class: dk.dma.ais.reader.AisReader.3
            @Override // dk.dma.enav.util.function.Consumer
            public void accept(AisPacket aisPacket) {
                newStream.add(aisPacket);
            }
        });
        return newStream.immutableStream();
    }
}
