Skip to content

Instantly share code, notes, and snippets.

@shvyrev
Created September 27, 2024 14:17
Show Gist options
  • Save shvyrev/4a9a29e0c611b44742f05529a9ccc663 to your computer and use it in GitHub Desktop.
Save shvyrev/4a9a29e0c611b44742f05529a9ccc663 to your computer and use it in GitHub Desktop.

Revisions

  1. shvyrev created this gist Sep 27, 2024.
    53 changes: 53 additions & 0 deletions ZagsGetAllMessagesProcessor
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,53 @@
    package org.bft.bftresend.business.zags.processors;

    import lombok.extern.slf4j.Slf4j;
    import org.apache.camel.Exchange;
    import org.apache.camel.Processor;
    import org.bft.bftresend.business.zags.model.ZagsProxy;
    import org.bft.bftresend.utils.FileUtils;
    import org.bft.bftresend.utils.MessageProcessor;
    import org.springframework.stereotype.Component;

    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.util.List;
    import java.util.function.Predicate;
    import java.util.stream.Collectors;

    import static java.nio.charset.StandardCharsets.UTF_8;
    import static java.util.stream.Stream.of;
    import static org.bft.bftresend.utils.Const.MESSAGE_ID_PREFIX;

    @SuppressWarnings("unchecked")
    @Slf4j
    @Component
    public class ZagsGetAllMessagesProcessor implements Processor, MessageProcessor {

    @Override
    public void process(Exchange exchange) throws Exception {

    try {
    var proxies = (List<ZagsProxy>) exchange.getProperty("proxies", List.class);
    var filePath = exchange.getIn().getBody().toString();

    Long jobId = exchange.getProperty("jobId", Long.class);

    Predicate<String> predicate = line -> proxies.contains(getProxy(line))
    && line.contains(MESSAGE_ID_PREFIX);

    try (var lines = Files.lines(Paths.get(filePath), UTF_8).filter(predicate)) {
    lines
    .map(line -> getMessageLogInfo(jobId, line))
    .forEach(v -> {
    String path = "tmp/" + v.proxy().name() + ".csv";
    String value = of(v.date(), v.messageId(), v.fileId(), v.clientId(), v.proxy())
    .map(String::valueOf)
    .collect(Collectors.joining(","));
    FileUtils.writeAppend(path, value);
    });
    }
    } catch (Exception e) {
    log.error("$ process ", e);
    }
    }
    }