package dk.dma.commons.service.io;

import com.google.common.base.Objects;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.Service;
import dk.dma.commons.service.AbstractBatchedStage;
import dk.dma.commons.util.io.OutputStreamSink;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.AsyncTaskExecutor;

/* loaded from: input_file:dk/dma/commons/service/io/MessageToFileService.class */
public class MessageToFileService<T> extends AbstractBatchedStage<T> {
    static final Logger LOG = LoggerFactory.getLogger(MessageToFileService.class);
    Path currentPath;
    final String filename;
    long lastTime;
    final ReentrantLock lock;
    final long maxSize;
    final Path root;
    final RollingOutputStream ros;
    final SimpleDateFormat sdf;
    final OutputStreamSink<T> sink;
    long count;

    /* loaded from: input_file:dk/dma/commons/service/io/MessageToFileService$FlushThread.class */
    class FlushThread extends AbstractScheduledService {
        FlushThread() {
        }

        @Override // com.google.common.util.concurrent.AbstractScheduledService
        protected void runOneIteration() throws Exception {
            MessageToFileService.this.lock.lock();
            try {
                try {
                    MessageToFileService.this.ros.flush();
                    if (MessageToFileService.this.currentPath != null && !Objects.equal(MessageToFileService.this.root.resolve(MessageToFileService.this.sdf.format(new Date(MessageToFileService.this.time()))), MessageToFileService.this.currentPath)) {
                        MessageToFileService.this.currentPath = null;
                        MessageToFileService.this.ros.close();
                        MessageToFileService.this.count = 0L;
                    }
                } catch (IOException e) {
                    MessageToFileService.LOG.error("FlushThread failed", (Throwable) e);
                }
            } finally {
                MessageToFileService.this.lock.unlock();
            }
        }

        @Override // com.google.common.util.concurrent.AbstractScheduledService
        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedRateSchedule(1L, 1L, TimeUnit.SECONDS);
        }
    }

    MessageToFileService(Path path, String str, OutputStreamSink<T> outputStreamSink, long j) {
        super(10000, 100);
        this.lastTime = -1L;
        this.lock = new ReentrantLock();
        this.ros = new RollingOutputStream();
        this.root = (Path) java.util.Objects.requireNonNull(path);
        this.filename = (String) java.util.Objects.requireNonNull(str);
        this.sink = (OutputStreamSink) java.util.Objects.requireNonNull(outputStreamSink);
        this.maxSize = j;
        this.sdf = new SimpleDateFormat(str);
    }

    long time() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis < this.lastTime) {
            System.err.println("Cannot go backwards, last time =" + this.lastTime + ", currenttime=" + currentTimeMillis + " writing anyways");
            currentTimeMillis = this.lastTime;
        }
        return currentTimeMillis;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // dk.dma.commons.service.AbstractMessageProcessorService
    public void handleMessages(List<T> list) throws IOException {
        this.lock.lock();
        try {
            for (T t : list) {
                long time = time();
                if (this.currentPath == null || time / 1000 != this.lastTime / 1000) {
                    Path resolve = this.root.resolve(this.sdf.format(new Date(time)));
                    if (!Objects.equal(resolve, this.currentPath)) {
                        LOG.info("Opening file " + resolve.toAbsolutePath() + " for backup");
                        this.ros.roll(resolve);
                        this.count = 0L;
                        this.currentPath = resolve;
                    }
                }
                OutputStreamSink<T> outputStreamSink = this.sink;
                OutputStream publicStream = this.ros.getPublicStream();
                long j = this.count;
                this.count = j + 1;
                outputStreamSink.process(publicStream, t, j);
                this.lastTime = time;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // dk.dma.commons.service.AbstractBatchedStage
    protected void onShutdown() {
        this.lock.lock();
        try {
            try {
                this.ros.close();
                this.lock.unlock();
            } catch (IOException e) {
                LOG.error("Could not close stream " + this.currentPath + " for backup", (Throwable) e);
                this.lock.unlock();
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public static <T> MessageToFileService<T> dateTimeService(Path path, String str, OutputStreamSink<T> outputStreamSink) {
        return new MessageToFileService<>(path, validateFilename(path, str), outputStreamSink, AsyncTaskExecutor.TIMEOUT_INDEFINITE);
    }

    static String validateFilename(Path path, String str) {
        java.util.Objects.requireNonNull(path, "root is null");
        path.resolve(new SimpleDateFormat(str).format(new Date()));
        return str;
    }

    public Service startFlushThread() {
        return new FlushThread();
    }
}
