package org.bft.bftresend.business.zags.processors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.bft.bftresend.business.zags.model.ZagsProxy; import org.bft.bftresend.business.zags.model.dto.MessageLogInfo; import org.bft.bftresend.utils.MessageProcessor; import org.springframework.stereotype.Component; import java.nio.file.Files; import java.nio.file.Paths; import java.time.LocalDate; import java.util.Collections; import java.util.List; import java.util.function.Predicate; import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; @SuppressWarnings("unchecked") @Slf4j @Component @RequiredArgsConstructor public class ZagsReadFileForAnalyzeProcessor implements Processor, MessageProcessor { @Override public void process(Exchange exchange) throws Exception { var filePath = exchange.getIn().getBody().toString(); LocalDate currentDate = getLocalDate(filePath); Long jobId = exchange.getProperty("jobId", Long.class); List dates = (List) exchange.getProperty("dates", List.class); List proxyList = (List) exchange.getProperty("proxies", List.class); try { if (dates.contains(currentDate)) { Predicate predicate = line -> proxyList.contains(getProxy(line)) && dates.contains(getLocalDate(line)); try (var lines = Files.lines(Paths.get(filePath), UTF_8).filter(predicate)) { List results = lines .map(line -> getMessageLogInfo(jobId, line)) .collect(Collectors.toList()); exchange.getIn().setBody(results); } catch (Exception e) { log.error("$ process ", e); } } else { exchange.getIn().setBody(Collections.emptyList()); } } catch (Exception e) { log.error("$ process ", e); } } }