Skip to content

Instantly share code, notes, and snippets.

@sachinshelke
Forked from maxant/Buyer.java
Created December 28, 2017 09:08
Show Gist options
  • Save sachinshelke/71e90ff8a94bca3f81e0f724d0dfcd24 to your computer and use it in GitHub Desktop.
Save sachinshelke/71e90ff8a94bca3f81e0f724d0dfcd24 to your computer and use it in GitHub Desktop.

Revisions

  1. Ant Kutschera created this gist Dec 24, 2014.
    97 changes: 97 additions & 0 deletions Buyer.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,97 @@
    package ch.maxant.tradingengine.model;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Collectors;

    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;

    import ch.maxant.tradingengine.model.TradingEngine.Listener;

    public class Buyer {

    private static final Logger LOGGER = LogManager.getLogger("buyer");

    private String name;
    private List<PurchaseOrder> purchaseOrders = new ArrayList<>();

    public Listener listener;

    public Buyer(String name) {
    this.name = name;
    }

    public String getName() {
    return name;
    }

    public void addPurchaseOrder(PurchaseOrder purchaseOrder) {
    LOGGER.debug(name + " adding " + purchaseOrder);
    purchaseOrder.setBuyer(this);
    this.purchaseOrders.add(purchaseOrder);
    }

    /**
    * @return {Array} all the {@link PurchaseOrder}s for the given product,
    * where the maximum acceptable price is more than the given price
    */
    public List<PurchaseOrder> getRelevantPurchaseOrders(String productId,
    double price) {
    return this.purchaseOrders
    .stream()
    .filter(po -> {
    return po.getProductId().equals(productId)
    && po.getMaximumAcceptedPrice() >= price;
    }).collect(Collectors.toList());
    }

    public void removePurchaseOrder(PurchaseOrder purchaseOrder) {
    this.purchaseOrders.remove(purchaseOrder);
    }

    public List<PurchaseOrder> removeOutdatedPurchaseOrders(long ageInMs) {
    long now = System.currentTimeMillis();
    List<PurchaseOrder> filter = this.purchaseOrders.stream()
    .filter(po -> {
    return now - po.getCreated().getTime() > ageInMs;
    }).collect(Collectors.toList());
    this.purchaseOrders.removeAll(filter);
    return filter;
    }

    public List<PurchaseOrder> getPurchaseOrders() {
    return purchaseOrders;
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((name == null) ? 0 : name.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    Buyer other = (Buyer) obj;
    if (name == null) {
    if (other.name != null)
    return false;
    } else if (!name.equals(other.name))
    return false;
    return true;
    }

    @Override
    public String toString() {
    return "Buyer [name=" + name + "]";
    }

    }
    30 changes: 30 additions & 0 deletions Constants.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    package ch.maxant.tradingengine.web;

    import java.util.concurrent.atomic.AtomicInteger;

    public final class Constants {

    // TODO use config to decide how engines to start
    public static final int NUM_KIDS = 4;

    public static final AtomicInteger ID = new AtomicInteger();

    public static final String[] PRODUCT_IDS = { "0", "1", "2", "3", "4", "5",
    "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17",
    "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28",
    "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39",
    "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "50",
    "51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61",
    "62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72",
    "73", "74", "75", "76", "77", "78", "79", "80", "81", "82", "83",
    "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94",
    "95", "96", "97", "98", "99" };

    public static final long DELAY = 3; // how many milliseconds between
    // trading
    // sessions

    public static final long TIMEOUT = 60000; // num ms after which incomplete
    // SOs and POs should be removed

    }
    246 changes: 246 additions & 0 deletions Market.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,246 @@
    package ch.maxant.tradingengine.model;

    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Optional;
    import java.util.Set;
    import java.util.stream.Collectors;

    import org.apache.commons.lang3.mutable.MutableBoolean;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;

    /**
    * A market contains 0..n sellers. A seller has 0..i sales orders each of which
    * contains a quantity of a product at a certain price. The seller is prepared
    * to sell their product for that price. The market also contains 0..m buyers.
    * Each buyer has 0..j purchase orders. A purchase order is for a given product
    * and quantity. The purchase price may not exceed the given price. The market
    * works by continuously looping through trade sittings. See the {@link #trade}
    * method.
    */
    public class Market {

    private static final Logger LOGGER = LogManager.getLogger("market");

    private List<Seller> sellers = new ArrayList<>();
    private List<Buyer> buyers = new ArrayList<>();

    private MarketInfo marketInfo;

    public void addSeller(Seller seller) {
    this.sellers.add(seller);
    }

    public void addBuyer(Buyer buyer) {
    this.buyers.add(buyer);
    }

    /**
    * At a single trade sitting, the following happens: 1) find all products
    * available (on offer by sellers) 2) for each product: 2a) for each buyer
    * interested in that product: 2ai) find the seller with the cheapest price
    * for the current product 2aii) if such a seller exists, create a sale,
    * otherwise nobody is selling the product anymore, so skip to next product.
    *
    * The point is that a buyer always goes to the cheapest seller, even if
    * that seller doesnt have enough quantity. A buyer who wants more has to
    * wait until the next trading session to find the next most suitable
    * seller.
    *
    * @return {Array} array of {@link Sale}s in this trade
    */
    public List<Sale> trade() {
    List<Sale> sales = new ArrayList<>();

    Set<String> productsInMarket = getProductsInMarket();

    this.collectMarketInfo();

    // trade each product in succession
    productsInMarket
    .stream()
    .forEach(
    productId -> {

    MutableBoolean soldOutOfProduct = new MutableBoolean(
    false);
    LOGGER.debug("trading product " + productId);
    List<Buyer> buyersInterestedInProduct = getBuyersInterestedInProduct(productId);
    if (buyersInterestedInProduct.size() == 0) {
    LOGGER.info("no buyers interested in product "
    + productId);
    } else {
    buyersInterestedInProduct.forEach(buyer -> {
    if (soldOutOfProduct.isFalse()) {
    LOGGER.debug(" buyer "
    + buyer.getName()
    + " is searching for product "
    + productId);
    // select the cheapest seller
    Optional<Seller> cheapestSeller = sellers
    .stream()
    .filter(seller -> {
    return seller
    .hasProduct(productId);
    })
    .sorted((s1, s2) -> Double
    .compare(
    s1.getCheapestSalesOrder(
    productId)
    .getPrice(),
    s2.getCheapestSalesOrder(
    productId)
    .getPrice()))
    .findFirst();
    if (cheapestSeller.isPresent()) {
    LOGGER.debug(" cheapest seller is "
    + cheapestSeller.get()
    .getName());
    List<Sale> newSales = createSale(
    buyer,
    cheapestSeller.get(),
    productId);
    sales.addAll(newSales);
    LOGGER.debug(" sales completed");
    } else {
    LOGGER.warn(" market sold out of product "
    + productId);
    soldOutOfProduct.setTrue();
    }
    }
    });
    }
    });

    return sales;
    };

    public void collectMarketInfo() {
    this.marketInfo = new MarketInfo();

    this.marketInfo.pos = buyers.stream().map(buyer -> {
    return buyer.getPurchaseOrders();
    }).flatMap(l -> l.stream()).collect(Collectors.groupingBy(po -> {
    return po.getProductId();
    }));

    this.marketInfo.sos = sellers.stream().map(seller -> {
    return seller.getSalesOrders();
    }).flatMap(l -> l.stream()).collect(Collectors.groupingBy(so -> {
    return so.getProductId();
    }));

    /*
    * for(String productId : this.marketInfo.pos.keySet()){
    * this.marketInfo.pos.put(productId) =
    * this.marketInfo.pos[productId].length; } for(var productId :
    * this.marketInfo.sos){ this.marketInfo.sos[productId] =
    * this.marketInfo.sos[productId].length; }
    */
    };

    /**
    * creates a sale if the prices is within the buyers budget. iterates all of
    * the buyers purchase wishes for the given product so long as the seller
    * still has the product.
    *
    * @return {Array} array of new {@link Sale}s, after having removed a
    * quantity of the product from the seller/buyer.
    */
    public List<Sale> createSale(Buyer buyer, Seller seller, String productId) {
    SalesOrder cheapestSalesOrder = seller.getCheapestSalesOrder(productId);
    LOGGER.debug("cheapest sales order " + cheapestSalesOrder);

    // find the buyers purchase orders, where the po.price =>
    // cheapestSalesOrder.price
    // create a sale for each buyer's purchase order
    // until either the seller has no more stock at this price
    // or the buyer has bought all they want

    List<PurchaseOrder> purchaseOrders = buyer.getRelevantPurchaseOrders(
    productId, cheapestSalesOrder.getPrice());
    LOGGER.debug("relevant purchase orders: " + purchaseOrders);

    List<Sale> sales = new ArrayList<>();
    purchaseOrders.stream().forEach(
    purchaseOrder -> {
    int quantity = Math.min(
    cheapestSalesOrder.getRemainingQuantity(),
    purchaseOrder.getRemainingQuantity());
    LOGGER.debug("quantity " + quantity + " for PO: "
    + purchaseOrder);
    if (quantity > 0) {
    Sale sale = new Sale(buyer, seller, productId,
    cheapestSalesOrder.getPrice(), quantity);

    // add PO and SO for events
    sale.setPurchaseOrder(purchaseOrder);
    sale.setSalesOrder(cheapestSalesOrder);
    sales.add(sale);
    LOGGER.debug("created sale: " + sale);

    // adjust quantities purchaseOrder.remainingQuantity -=
    // quantity;
    cheapestSalesOrder.reduceRemainingQuantity(quantity);

    // remove completed purchase wishes
    if (purchaseOrder.getRemainingQuantity() == 0) {
    LOGGER.debug("PO complete: " + sale);
    buyer.removePurchaseOrder(purchaseOrder);
    }
    }
    });

    // remove completed sales orders
    if (cheapestSalesOrder.getRemainingQuantity() == 0) {
    LOGGER.debug("SO complete: " + cheapestSalesOrder);
    seller.removeSalesOrder(cheapestSalesOrder);
    }

    return sales;
    }

    /**
    * @return all buyers in the market who have a purchase order for the given
    * product
    */
    public List<Buyer> getBuyersInterestedInProduct(final String productId) {
    return this.buyers.stream().filter(buyer -> {
    return buyer.getPurchaseOrders().stream().anyMatch(po -> {
    return po.getProductId().equals(productId);
    });
    }).collect(Collectors.toList());
    }

    /** @return all product IDs that are for sale in the market */
    public Set<String> getProductsInMarket() {
    // TODO use flatmap, but also in js too!
    Set<String> productsInMarket = new HashSet<>();
    sellers.forEach(seller -> {
    seller.getSalesOrders().stream().forEach(salesOrder -> {
    productsInMarket.add(salesOrder.getProductId());
    });
    });
    return productsInMarket;
    }

    public static class MarketInfo {
    public Map<String, List<PurchaseOrder>> pos;
    public Map<String, List<SalesOrder>> sos;
    }

    public MarketInfo getMarketInfo() {
    return marketInfo;
    }

    public List<Seller> getSellers() {
    return sellers;
    }

    public List<Buyer> getBuyers() {
    return buyers;
    }
    }
    33 changes: 33 additions & 0 deletions Model.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,33 @@
    package ch.maxant.tradingengine.model;

    import java.util.Collection;
    import java.util.Collections;

    import org.apache.commons.lang3.builder.EqualsBuilder;
    import org.apache.commons.lang3.builder.HashCodeBuilder;
    import org.apache.commons.lang3.builder.ToStringBuilder;
    import org.apache.commons.lang3.builder.ToStringStyle;

    public abstract class Model {

    @Override
    public boolean equals(Object o) {
    return EqualsBuilder.reflectionEquals(this, o, getIgnoredFields());
    }

    protected Collection<String> getIgnoredFields() {
    return Collections.emptySet();
    }

    @Override
    public int hashCode() {
    return HashCodeBuilder.reflectionHashCode(this, getIgnoredFields());
    }

    @Override
    public String toString() {
    return ToStringBuilder.reflectionToString(this,
    ToStringStyle.SHORT_PREFIX_STYLE);
    }

    }
    14 changes: 14 additions & 0 deletions ModelId.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,14 @@
    package ch.maxant.tradingengine.model;

    public class IdModel extends Model {

    private int id;

    public int getId() {
    return id;
    }

    public void setId(int id) {
    this.id = id;
    }
    }
    52 changes: 52 additions & 0 deletions PurchaseOrder.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,52 @@
    package ch.maxant.tradingengine.model;

    import java.util.Date;

    public class PurchaseOrder extends IdModel {

    private String productId;
    private int remainingQuantity;
    private int originalQuantity;
    private double maximumAcceptedPrice;
    private Date created;
    private Buyer buyer;

    public PurchaseOrder(String productId, int quantity,
    double maximumAcceptedPrice, int id) {
    this.productId = productId;
    this.remainingQuantity = quantity;
    this.originalQuantity = quantity;
    this.maximumAcceptedPrice = maximumAcceptedPrice;
    this.created = new Date();
    setId(id);
    }

    public void setBuyer(Buyer buyer) {
    this.buyer = buyer;
    }

    public String getProductId() {
    return productId;
    }

    public Buyer getBuyer() {
    return buyer;
    }

    public Date getCreated() {
    return created;
    }

    public double getMaximumAcceptedPrice() {
    return maximumAcceptedPrice;
    }

    public int getOriginalQuantity() {
    return originalQuantity;
    }

    public int getRemainingQuantity() {
    return remainingQuantity;
    }

    }
    70 changes: 70 additions & 0 deletions Sale.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    package ch.maxant.tradingengine.model;

    import java.util.Date;

    public class Sale extends IdModel {

    private Date timestamp;
    private Buyer buyer;
    private Seller seller;
    private String productId;
    private double price;
    private int quantity;
    private SalesOrder salesOrder;
    private PurchaseOrder purchaseOrder;

    /**
    * a sale from a seller to a buyer for the given product and price and
    * quantity.
    */
    public Sale(Buyer buyer, Seller seller, String productId, double price,
    int quantity) {
    this.buyer = buyer;
    this.seller = seller;
    this.productId = productId;
    this.price = price;
    this.quantity = quantity;
    this.timestamp = new Date();
    }

    public Buyer getBuyer() {
    return buyer;
    }

    public double getPrice() {
    return price;
    }

    public String getProductId() {
    return productId;
    }

    public int getQuantity() {
    return quantity;
    }

    public Seller getSeller() {
    return seller;
    }

    public Date getTimestamp() {
    return timestamp;
    }

    public void setSalesOrder(SalesOrder salesOrder) {
    this.salesOrder = salesOrder;
    }

    public SalesOrder getSalesOrder() {
    return salesOrder;
    }

    public void setPurchaseOrder(PurchaseOrder purchaseOrder) {
    this.purchaseOrder = purchaseOrder;
    }

    public PurchaseOrder getPurchaseOrder() {
    return purchaseOrder;
    }

    }
    58 changes: 58 additions & 0 deletions SalesOrder.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,58 @@
    package ch.maxant.tradingengine.model;

    import java.util.Date;

    public class SalesOrder extends IdModel {

    private double price;
    private String productId;
    private int remainingQuantity;
    private int originalQuantity;
    private Date created;
    private Seller seller;

    /**
    * an order to sell a given quantity of a product at a given price
    */
    public SalesOrder(double price, String productId, int quantity, int id) {
    this.price = price;
    this.productId = productId;
    this.remainingQuantity = quantity;
    this.originalQuantity = quantity;
    this.created = new Date();
    setId(id);
    }

    public Date getCreated() {
    return created;
    }

    public int getRemainingQuantity() {
    return remainingQuantity;
    }

    public String getProductId() {
    return productId;
    }

    public double getPrice() {
    return price;
    }

    public int getOriginalQuantity() {
    return originalQuantity;
    }

    public Seller getSeller() {
    return seller;
    }

    public void setSeller(Seller seller) {
    this.seller = seller;
    }

    public void reduceRemainingQuantity(double quantity) {
    this.remainingQuantity -= quantity;
    }

    }
    111 changes: 111 additions & 0 deletions Seller.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,111 @@
    package ch.maxant.tradingengine.model;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;

    import org.apache.commons.lang3.ObjectUtils;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;

    import ch.maxant.tradingengine.model.TradingEngine.Listener;

    public class Seller {

    private static final Logger LOGGER = LogManager.getLogger("seller");

    private List<SalesOrder> salesOrders = new ArrayList<>();
    private String name;

    public Listener listener;

    public Seller(String name) {
    this.name = name;
    }

    public void addSalesOrder(SalesOrder salesOrder) {
    LOGGER.debug(name + " adding " + salesOrder);
    salesOrder.setSeller(this);
    this.salesOrders.add(salesOrder);
    }

    public boolean hasProduct(String productId) {
    return this.salesOrders.stream().anyMatch(so -> {
    return so.getProductId().equals(productId);
    });
    }

    /**
    * @return {SalesOrder} the sales order for the given product that has the
    * lowest price
    */
    public SalesOrder getCheapestSalesOrder(String productId) {
    return this.salesOrders.stream().filter(so -> {
    return so.getProductId().equals(productId);
    }).sorted((o1, o2) -> Double.compare(o1.getPrice(), o2.getPrice()))
    .findFirst().get();
    }

    public void removeSalesOrder(SalesOrder salesOrder) {
    this.salesOrders = this.salesOrders.stream().filter(so -> {
    return !salesOrder.equals(so);
    }).collect(Collectors.toList());
    }

    /** @return the out of date ones */
    public List<SalesOrder> removeOutdatedSalesOrders(long ageInMs) {
    long now = System.currentTimeMillis();

    Map<Boolean, List<SalesOrder>> partitioned = salesOrders.stream()
    .collect(Collectors.groupingBy(so -> {
    return now - so.getCreated().getTime() > ageInMs;
    }));

    this.salesOrders = ObjectUtils.defaultIfNull(
    partitioned.get(Boolean.FALSE),
    Collections.synchronizedList(new ArrayList<SalesOrder>()));
    return ObjectUtils.defaultIfNull(partitioned.get(Boolean.TRUE),
    Collections.synchronizedList(new ArrayList<SalesOrder>()));
    }

    public List<SalesOrder> getSalesOrders() {
    return salesOrders;
    }

    public String getName() {
    return name;
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((name == null) ? 0 : name.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    Seller other = (Seller) obj;
    if (name == null) {
    if (other.name != null)
    return false;
    } else if (!name.equals(other.name))
    return false;
    return true;
    }

    @Override
    public String toString() {
    return "Seller [name=" + name + "]";
    }

    }
    459 changes: 459 additions & 0 deletions TradingEngine.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,459 @@
    package ch.maxant.tradingengine.model;

    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;

    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import javax.sql.DataSource;

    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;

    // /////////////////////////////////////////////////
    // this file contains all classes related to a trading
    // engine which uses a market to simulate a trading platform.
    // /////////////////////////////////////////////////
    public class TradingEngine {

    private static final Logger LOGGER = LogManager.getLogger("tradingEngine");

    public static interface Listener {
    public void onEvent(EventType type, Object data);
    }

    public static enum EventType {
    SALE, PURCHASE, TIMEOUT_SALESORDER, TIMEOUT_PURCHASEORDER, STATS, STOPPED
    }

    private Market market = new Market();
    private Map<String, MarketPrice> marketPrices = new HashMap<>();
    private Map<String, List<VolumeRecord>> volumeRecords = new HashMap<>();
    private InitialContext ctx = new InitialContext();
    private Map<Seller, List<SalesOrder>> newSalesOrders = new HashMap<>();
    private Map<Buyer, List<PurchaseOrder>> newPurchaseOrders = new HashMap<>();

    private long delay;

    private long timeout;

    private Listener listener;
    private boolean running = true;

    /**
    * if false, then runs in an infinite loop until {@link #stop()} is called.
    * if true, then just runs once, and notifies listener that its stopped when
    * that one run is done, so that the listener can decide when to restart the
    * engine for another trading session.
    */
    private boolean runInActorMode;

    /**
    * basically a buyer goes into the market at a time where they are happy to
    * pay the market price. they take it from the cheapest seller (ie the
    * market price). depending on who is left, the market price goes up or down
    *
    * a trading engine has one market place and it controls the frequency of
    * trades. between trades: - sellers and buyers may enter and exit - all
    * sales are persisted
    *
    * @param delay
    * number of milliseconds between trades
    * @param timeout
    * the number of milliseconds after which incomplete sales or
    * purchase orders should be removed and their buyer/seller
    * informed of the (partial) failure.
    * @throws NamingException
    */
    public TradingEngine(long delay, long timeout, Listener listener)
    throws NamingException {
    this(delay, timeout, listener, false);
    }

    public TradingEngine(long delay, long timeout, Listener listener,
    boolean runInActorMode) throws NamingException {

    this.delay = delay;
    this.timeout = timeout;
    this.listener = listener;
    this.runInActorMode = runInActorMode;
    LOGGER.debug("market is opening for trading!");
    }

    public void run() {

    while (running) {
    LOGGER.debug("\n\n------------------------------- trading...-------------------------");
    long start = System.currentTimeMillis();

    prepareMarket();

    List<Sale> sales = market.trade();
    LOGGER.info("trading completed");

    noteMarketPricesAndVolumes(sales);

    try {
    persistSale(sales);
    } catch (Exception e) {
    LOGGER.error("failed to persist sales: " + sales, e);
    }
    LOGGER.info("persisting completed, notifying involved parties...");
    sales.stream().forEach(sale -> {
    if (sale.getBuyer().listener != null)
    sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);
    if (sale.getSeller().listener != null)
    sale.getSeller().listener.onEvent(EventType.SALE, sale);
    });
    if (!sales.isEmpty()) {
    LOGGER.warn("trading of " + sales.size()
    + " sales completed and persisted in "
    + (System.currentTimeMillis() - start) + "ms");
    } else {
    LOGGER.info("no trades...");
    }

    // debug(self.market, 10, false);
    if (listener != null)
    this.updateMarketVolume(null); // removes outdated data
    listener.onEvent(EventType.STATS,
    new Object[] { market.getMarketInfo(), this.marketPrices,
    this.volumeRecords });
    try {
    Thread.sleep(delay);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    if (runInActorMode) {
    break;
    }
    }

    listener.onEvent(EventType.STOPPED, null);
    }

    public void stop() {
    this.running = false;
    }

    /**
    * @method @return a VolumeRecord, just with no timestamp. properties are
    * total in last minute.
    */
    public VolumeRecord getCurrentVolume(String productId) {
    List<VolumeRecord> vrs = this.volumeRecords.get(productId);
    if (vrs != null) {
    long now = System.currentTimeMillis();
    vrs = vrs.stream().filter(vr -> {
    return now - vr.timestamp.getTime() < 1000 * 10;
    }).collect(Collectors.toList()); // remove old
    this.volumeRecords.put(productId, vrs); // ensure records contains
    // most up to date

    // aggregate
    VolumeRecord vr = new VolumeRecord(productId, 0, 0, null, 0);
    vr = vrs.stream().reduce(vr, VolumeRecord::add);
    return vr;
    } else {
    return new VolumeRecord(productId, 0, 0, null, 0);
    }
    }

    /** @method @return the last known price */
    public MarketPrice getCurrentMarketPrice(String productId) {
    return this.marketPrices.get(productId);
    }

    // handles timed out orders
    private void prepareMarket() {

    // handle timeouted sales orders
    market.getSellers().forEach(
    seller -> {
    List<SalesOrder> incompleteSOs = seller
    .removeOutdatedSalesOrders(timeout);
    incompleteSOs.forEach(so -> {
    if (so.getSeller().listener != null)
    so.getSeller().listener.onEvent(
    EventType.TIMEOUT_SALESORDER, so);
    else
    LOGGER.debug("incomplete SO: " + so);
    });
    });

    // handle timeouted purchase orders
    market.getBuyers().forEach(
    buyer -> {
    List<PurchaseOrder> incompletePOs = buyer
    .removeOutdatedPurchaseOrders(timeout);
    incompletePOs.forEach(po -> {
    if (po.getBuyer().listener != null)
    po.getBuyer().listener.onEvent(
    EventType.TIMEOUT_PURCHASEORDER, po);
    else
    LOGGER.debug("incomplete PO: " + po);
    });
    });

    if (!runInActorMode) {
    // add new SOs and POs
    synchronized (newSalesOrders) {
    newSalesOrders.forEach((seller, sos) -> {
    if (!this.market.getSellers().contains(seller)) {
    LOGGER.debug("seller named " + seller.getName()
    + " doesnt exist -> adding a new one");
    this.market.addSeller(seller);
    seller.listener = listener;
    } else {
    // swap temp seller with the actual one in the market
    seller = this.market.getSellers().get(
    this.market.getSellers().indexOf(seller));
    }
    final Seller fSeller = seller;
    sos.forEach(so -> {
    fSeller.addSalesOrder(so);
    });
    });
    newSalesOrders.clear();
    }

    synchronized (newPurchaseOrders) {
    newPurchaseOrders.forEach((buyer, pos) -> {
    if (!this.market.getBuyers().contains(buyer)) {
    LOGGER.debug("buyer named " + buyer.getName()
    + " doesnt exist -> adding a new one");
    this.market.addBuyer(buyer);
    buyer.listener = listener;
    } else {
    // swap temp buyer with the actual one in the market
    buyer = this.market.getBuyers().get(
    this.market.getBuyers().indexOf(buyer));
    }
    final Buyer fBuyer = buyer;
    pos.forEach(po -> {
    fBuyer.addPurchaseOrder(po);
    });
    });
    newPurchaseOrders.clear();
    }
    }
    }

    private void persistSale(List<Sale> sales) throws Exception {
    if (!sales.isEmpty()) {
    LOGGER.info("preparing to persist sales");

    DataSource ds = (DataSource) ctx.lookup("java:comp/env/jdbc/mysql");
    try (Connection c = ds.getConnection()) {
    PreparedStatement stmt = c
    .prepareStatement("INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) "
    + "values (?, ?, ?, ?, ?, ?, ?)");
    sales.forEach(sale -> {
    try {
    int i = 1;
    stmt.setString(i++, sale.getBuyer().getName());
    stmt.setString(i++, sale.getSeller().getName());
    stmt.setInt(i++, Integer.parseInt(sale.getProductId()));
    stmt.setDouble(i++, sale.getPrice());
    stmt.setInt(i++, sale.getQuantity());
    stmt.setInt(i++, sale.getPurchaseOrder().getId());
    stmt.setInt(i++, sale.getSalesOrder().getId());
    if (stmt.execute()) {
    ResultSet rs = stmt.getGeneratedKeys();
    rs.next();
    sale.setId(rs.getInt(1));
    rs.close();
    }
    } catch (SQLException e) {
    throw new RuntimeException(e);
    }
    });
    }
    }
    }

    private void noteMarketPricesAndVolumes(List<Sale> sales) {
    sales.forEach(sale -> {
    updateMarketPrice(sale);
    updateMarketVolume(sale);
    });
    }

    public static class MarketPrice {
    private String productId;
    private double price;
    private Date timestamp;

    public MarketPrice(String productId, double price, Date timestamp) {
    this.productId = productId;
    this.price = price;
    this.timestamp = timestamp;
    }

    public double getPrice() {
    return price;
    }

    public String getProductId() {
    return productId;
    }

    public Date getTimestamp() {
    return timestamp;
    }
    }

    private void updateMarketPrice(Sale sale) {
    MarketPrice mp = marketPrices.get(sale.getProductId());
    if (mp == null
    || (mp != null && mp.getTimestamp().getTime() < sale
    .getTimestamp().getTime())) {
    // set price if none is known, or replace price if its older than
    // current price
    marketPrices.put(
    sale.getProductId(),
    new MarketPrice(sale.getProductId(), sale.getPrice(), sale
    .getTimestamp()));
    }
    }

    public static class VolumeRecord {
    public static final VolumeRecord EMPTY = new VolumeRecord(null, 0, 0,
    null, 0);
    public String productId;
    public int numberOfSales;
    public double turnover;
    public Date timestamp;
    public int count;

    public VolumeRecord(String productId, int numberOfSales,
    double turnover, Date timestamp, int count) {
    this.productId = productId;
    this.numberOfSales = numberOfSales;
    this.turnover = turnover;
    this.timestamp = timestamp;
    this.count = count;
    }

    public static VolumeRecord add(VolumeRecord a, VolumeRecord b) {
    return new VolumeRecord(b.productId, a.numberOfSales
    + b.numberOfSales, a.turnover + b.turnover, null, a.count
    + b.count);
    }

    public static VolumeRecord aggregate(List<VolumeRecord> vrs) {
    VolumeRecord vr = EMPTY;
    vr = vrs.stream().reduce(vr, VolumeRecord::add);
    return vr;

    }
    }

    private void updateMarketVolume(Sale sale) {
    // //////////////
    // remove old ones
    // //////////////
    Map<String, List<VolumeRecord>> newVolumeRecords = new HashMap<>();
    long now = System.currentTimeMillis();
    volumeRecords.forEach((k, v) -> {
    List<VolumeRecord> vrs = v.stream().filter(vr -> {
    return now - vr.timestamp.getTime() < 1000 * 10;
    }).collect(Collectors.toList()); // remove older than 10 secs
    newVolumeRecords.put(k, vrs);
    });
    volumeRecords = newVolumeRecords; // replace the old ones

    // //////////////
    // add new data
    // //////////////
    if (sale != null) {
    List<VolumeRecord> vrs = volumeRecords.get(sale.getProductId());
    if (vrs == null) {
    vrs = new ArrayList<>();
    }
    vrs.add(new VolumeRecord(sale.getProductId(), sale.getQuantity(),
    sale.getQuantity() * sale.getPrice(), sale.getTimestamp(),
    1)); // scale up to "per minute"
    volumeRecords.put(sale.getProductId(), vrs); // replace with old one
    }
    }

    public PurchaseOrder addPurchaseOrder(String who, String productId,
    int quantity, int id) {

    if (runInActorMode) {
    Buyer buyer = new Buyer(who);
    if (!this.market.getBuyers().contains(buyer)) {
    LOGGER.debug("buyer named " + who
    + " doesnt exist -> adding a new one");
    this.market.addBuyer(buyer);
    buyer.listener = listener;
    } else {
    // swap temp buyer with the actual one in the market
    buyer = this.market.getBuyers().get(
    this.market.getBuyers().indexOf(buyer));
    }
    PurchaseOrder po = new PurchaseOrder(productId, quantity, 9999.9,
    id);
    buyer.addPurchaseOrder(po);
    return po;
    } else {
    synchronized (newPurchaseOrders) {
    Buyer b = new Buyer(who);
    List<PurchaseOrder> pos = newPurchaseOrders.get(b);
    if (pos == null) {
    pos = new ArrayList<>();
    newPurchaseOrders.put(b, pos);
    }
    PurchaseOrder po = new PurchaseOrder(productId, quantity,
    9999.9, id);
    pos.add(po);
    return po;
    }

    }
    }

    public SalesOrder addSalesOrder(String who, String productId, int quantity,
    double price, int id) {

    if (runInActorMode) {
    Seller seller = new Seller(who);
    if (!this.market.getSellers().contains(seller)) {
    LOGGER.debug("seller named " + who
    + " doesnt exist -> adding a new one");
    this.market.addSeller(seller);
    seller.listener = listener;
    } else {
    // swap temp seller with the actual one in the market
    seller = this.market.getSellers().get(
    this.market.getSellers().indexOf(seller));
    }
    SalesOrder so = new SalesOrder(price, productId, quantity, id);
    seller.addSalesOrder(so);
    return so;
    } else {
    synchronized (newSalesOrders) {
    Seller s = new Seller(who);
    List<SalesOrder> sos = newSalesOrders.get(s);
    if (sos == null) {
    sos = new ArrayList<>();
    newSalesOrders.put(s, sos);
    }
    SalesOrder so = new SalesOrder(price, productId, quantity, id);
    sos.add(so);
    return so;
    }
    }
    }

    }
    218 changes: 218 additions & 0 deletions TradingEngineServlet.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,218 @@
    package ch.maxant.tradingengine.web;

    import static ch.maxant.tradingengine.web.Constants.DELAY;
    import static ch.maxant.tradingengine.web.Constants.ID;
    import static ch.maxant.tradingengine.web.Constants.NUM_KIDS;
    import static ch.maxant.tradingengine.web.Constants.PRODUCT_IDS;
    import static ch.maxant.tradingengine.web.Constants.TIMEOUT;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;

    import javax.naming.NamingException;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;

    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;

    import ch.maxant.tradingengine.model.PurchaseOrder;
    import ch.maxant.tradingengine.model.Sale;
    import ch.maxant.tradingengine.model.SalesOrder;
    import ch.maxant.tradingengine.model.TradingEngine.EventType;
    import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord;
    import ch.maxant.tradingengine.model.TradingEngineThread;

    @WebServlet(urlPatterns = { "/sell", "/buy", "/result" })
    public class TradingEngineServlet extends HttpServlet {

    private static final long serialVersionUID = 1L;

    private static final Logger LOGGER = LogManager
    .getLogger("tradingEngineServlet");

    private static Stats stats = new Stats();
    private static final Map<String, TradingEngineThread> kids = new HashMap<>();
    private static final Map<String, Result> results = new ConcurrentHashMap<>();
    private static final Set<String> knownProducts = Collections
    .synchronizedSet(new HashSet<>());
    private static final AtomicInteger timedoutSales = new AtomicInteger(0);

    static {
    try {
    int chunk = PRODUCT_IDS.length / NUM_KIDS;
    for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) {
    String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i
    + chunk);
    LOGGER.info("created engine for products " + temparray);
    TradingEngineThread engineThread = new TradingEngineThread(
    DELAY, TIMEOUT, (type, data) -> event(type, data));
    for (int k = 0; k < temparray.length; k++) {
    LOGGER.debug("mapping productId '" + temparray[k]
    + "' to engine " + i);
    kids.put(temparray[k], engineThread);
    }
    LOGGER.info("---started trading");
    engineThread.start();
    }
    } catch (NamingException e) {
    LOGGER.error("failed to start engine", e);
    }

    // remove results older than a minute, every 5 seconds.
    // in a real system you wouldnt necessarily cache results like
    // we are doing - the sales are actually persisted by the
    // trading engine - so clients could go look there!
    new Timer("cleaner", true).scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
    LOGGER.error("cleaning results... sales per minute: "
    + stats.totalSalesPerMinute + ", "
    + timedoutSales.get() + " timedout orders");
    long now = System.currentTimeMillis();
    List<String> toRemove = new ArrayList<>();
    results.forEach((k, v) -> {
    if (now - v.created > 60000) {
    toRemove.add(k);
    }
    });
    toRemove.forEach(k -> results.remove(k));
    LOGGER.info("completed cleaning results in "
    + (System.currentTimeMillis() - now) + "ms");
    }
    }, 5000L, 5000L);
    }

    public static synchronized void event(final EventType type,
    final Object data) {
    switch (type) {
    case SALE: {
    Sale sale = (Sale) data;
    int id = sale.getSalesOrder().getId();
    results.put(String.valueOf(id), new Result(String.valueOf(data)));
    if (sale.getSalesOrder().getRemainingQuantity() == 0) {
    String msg = "COMPLETED sales order";
    LOGGER.info("\n" + id + ") " + msg + " " + data);
    } else {
    LOGGER.info("\n" + id + ") PARTIAL sales order " + data);
    }
    break;
    }
    case PURCHASE: {
    Sale sale = (Sale) data;
    int id = sale.getPurchaseOrder().getId();
    results.put(String.valueOf(id), new Result(String.valueOf(data)));
    if (sale.getPurchaseOrder().getRemainingQuantity() == 0) {
    String msg = "COMPLETED purchase order";
    LOGGER.info("\n" + id + ") " + msg + " " + data);
    } else {
    LOGGER.info("\n" + id + ") PARTIAL purchase order " + data);
    }
    break;
    }
    case TIMEOUT_SALESORDER: {
    timedoutSales.incrementAndGet();
    SalesOrder so = (SalesOrder) data;
    String msg = "TIMEOUT sales order";
    LOGGER.info("\n" + so.getId() + ") " + msg + " " + data);
    break;
    }
    case TIMEOUT_PURCHASEORDER: {
    timedoutSales.incrementAndGet();
    PurchaseOrder po = (PurchaseOrder) data;
    String msg = "TIMEOUT purchase order";
    LOGGER.info("\n" + po.getId() + ") " + msg + " " + data);
    break;
    }
    case STATS: {
    synchronized (knownProducts) {
    Map<String, List<VolumeRecord>> mapOfVolumeRecords = (Map<String, List<VolumeRecord>>) ((Object[]) data)[2];
    stats.totalSalesPerMinute = knownProducts
    .stream()
    .map(productId -> {
    return VolumeRecord.aggregate(mapOfVolumeRecords
    .getOrDefault(productId,
    Collections.emptyList())).count;
    }).reduce(Integer::sum).orElse(0) * 6; // since stats
    // are
    // recorded
    // for the last
    // 10 secs
    }
    break;
    }
    default:
    break;
    }
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
    throws ServletException, IOException {

    String path = req.getServletPath();
    LOGGER.debug("received command: '" + path + "'");

    String who = req.getParameter("userId");
    String productId = req.getParameter("productId");
    int quantity = Integer.parseInt(req.getParameter("quantity"));
    TradingEngineThread engine = kids.get(productId);
    knownProducts.add(productId);
    int id = ID.getAndIncrement();

    // /buy?productId=1&quantity=10&userId=ant
    if (path.equals("/buy")) {
    PurchaseOrder po = engine.addPurchaseOrder(who, productId,
    quantity, id);

    resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(po));
    } else if (path.equals("/sell")) {
    double price = Double.parseDouble(req.getParameter("price"));
    SalesOrder so = engine.addSalesOrder(who, productId, quantity,
    price, id);

    resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(so));
    } else if (path.equals("/result")) {
    String key = req.getParameter("id");
    Result r = results.get(key);
    if (r != null) {
    results.remove(key);
    resp.getWriter().write(r.data);
    } else {
    resp.getWriter().write("UNKNOWN OR PENDING");
    }
    } else {
    String msg = "Unknown command " + path;
    LOGGER.warn(msg);
    }

    }

    private static class Stats {
    int totalSalesPerMinute;
    }

    private static class Result {
    String data;
    long created;

    Result(String data) {
    this.data = data;
    this.created = System.currentTimeMillis();
    }
    }
    }
    280 changes: 280 additions & 0 deletions TradingEngineServletWithActors.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,280 @@
    package ch.maxant.tradingengine.web;

    import static ch.maxant.tradingengine.web.Constants.DELAY;
    import static ch.maxant.tradingengine.web.Constants.ID;
    import static ch.maxant.tradingengine.web.Constants.NUM_KIDS;
    import static ch.maxant.tradingengine.web.Constants.PRODUCT_IDS;
    import static ch.maxant.tradingengine.web.Constants.TIMEOUT;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;

    import javax.naming.NamingException;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;

    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;

    import akka.actor.AbstractActor;
    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
    import akka.japi.pf.ReceiveBuilder;
    import ch.maxant.tradingengine.model.Buyer;
    import ch.maxant.tradingengine.model.PurchaseOrder;
    import ch.maxant.tradingengine.model.Sale;
    import ch.maxant.tradingengine.model.SalesOrder;
    import ch.maxant.tradingengine.model.Seller;
    import ch.maxant.tradingengine.model.TradingEngine;
    import ch.maxant.tradingengine.model.TradingEngine.EventType;
    import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord;

    @WebServlet(urlPatterns = { "/sell2", "/buy2", "/result2" })
    public class TradingEngineServletWithActors extends HttpServlet {

    private static final long serialVersionUID = 1L;

    private static final Logger LOGGER = LogManager
    .getLogger("tradingEngineServletWithActors");

    private static final ActorSystem teSystem = ActorSystem
    .create("TradingEngines");

    private static final Stats stats = new Stats();
    private static final Map<String, ActorRef> kids = new HashMap<>();
    private static final Map<String, Result> results = new ConcurrentHashMap<>();
    private static final Set<String> knownProducts = Collections
    .synchronizedSet(new HashSet<>());
    private static final AtomicInteger timedoutSales = new AtomicInteger(0);

    static {
    int chunk = PRODUCT_IDS.length / NUM_KIDS;
    for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) {
    String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i + chunk);
    LOGGER.info("created engine for products " + temparray);

    ActorRef actor = teSystem.actorOf(
    Props.create(TradingEngineActor.class), "engine-" + i);
    for (int k = 0; k < temparray.length; k++) {
    LOGGER.debug("mapping productId '" + temparray[k]
    + "' to engine " + i);
    kids.put(temparray[k], actor);
    }
    LOGGER.info("---started trading");
    actor.tell(TradingEngineActor.RUN, ActorRef.noSender());
    }

    // remove results older than a minute, every 5 seconds.
    // in a real system you wouldnt necessarily cache results like
    // we are doing - the sales are actually persisted by the
    // trading engine - so clients could go look there!
    new Timer("cleaner", true).scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
    LOGGER.error("cleaning results... sales per minute: "
    + stats.totalSalesPerMinute + ", "
    + timedoutSales.get() + " timedout orders");
    long now = System.currentTimeMillis();
    List<String> toRemove = new ArrayList<>();
    results.forEach((k, v) -> {
    if (now - v.created > 60000) {
    toRemove.add(k);
    }
    });
    toRemove.forEach(k -> results.remove(k));
    LOGGER.info("completed cleaning results in "
    + (System.currentTimeMillis() - now) + "ms");
    }
    }, 5000L, 5000L);
    }

    public static synchronized void event(final EventType type,
    final Object data) {
    switch (type) {
    case SALE: {
    Sale sale = (Sale) data;
    int id = sale.getSalesOrder().getId();
    results.put(String.valueOf(id), new Result(String.valueOf(data)));
    if (sale.getSalesOrder().getRemainingQuantity() == 0) {
    String msg = "COMPLETED sales order";
    LOGGER.info("\n" + id + ") " + msg + " " + data);
    } else {
    LOGGER.info("\n" + id + ") PARTIAL sales order " + data);
    }
    break;
    }
    case PURCHASE: {
    Sale sale = (Sale) data;
    int id = sale.getPurchaseOrder().getId();
    results.put(String.valueOf(id), new Result(String.valueOf(data)));
    if (sale.getPurchaseOrder().getRemainingQuantity() == 0) {
    String msg = "COMPLETED purchase order";
    LOGGER.info("\n" + id + ") " + msg + " " + data);
    } else {
    LOGGER.info("\n" + id + ") PARTIAL purchase order " + data);
    }
    break;
    }
    case TIMEOUT_SALESORDER: {
    timedoutSales.incrementAndGet();
    SalesOrder so = (SalesOrder) data;
    String msg = "TIMEOUT sales order";
    LOGGER.info("\n" + so.getId() + ") " + msg + " " + data);
    break;
    }
    case TIMEOUT_PURCHASEORDER: {
    timedoutSales.incrementAndGet();
    PurchaseOrder po = (PurchaseOrder) data;
    String msg = "TIMEOUT purchase order";
    LOGGER.info("\n" + po.getId() + ") " + msg + " " + data);
    break;
    }
    case STATS: {
    // an alternative approach for dealing with concurrency problems
    // like ConcurrentModificationException is to copy the data, but it
    // only works when handling read only data
    Set<String> knownProductsCopy = new HashSet<>(knownProducts);
    Map<String, List<VolumeRecord>> mapOfVolumeRecords = new HashMap(
    (Map<String, List<VolumeRecord>>) ((Object[]) data)[2]);

    stats.totalSalesPerMinute = knownProductsCopy
    .stream()
    .map(productId -> {
    return VolumeRecord.aggregate(new ArrayList(
    mapOfVolumeRecords.getOrDefault(productId,
    Collections.emptyList()))).count;
    }).reduce(Integer::sum).orElse(0) * 6;
    // times 6 since stats are recorded for only the last ten secs,
    // and we want them per minute
    break;
    }
    default:
    break;
    }
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
    throws ServletException, IOException {

    String path = req.getServletPath();
    LOGGER.debug("received command: '" + path + "'");

    String who = req.getParameter("userId");
    String productId = req.getParameter("productId");
    int quantity = Integer.parseInt(req.getParameter("quantity"));
    ActorRef engine = kids.get(productId);
    knownProducts.add(productId);
    int id = ID.getAndIncrement();

    // /buy?productId=1&quantity=10&userId=ant
    if (path.equals("/buy2")) {
    PurchaseOrder po = new PurchaseOrder(productId, quantity, 9999.9,
    id);
    po.setBuyer(new Buyer(who));
    engine.tell(po, ActorRef.noSender());

    resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(po));
    } else if (path.equals("/sell2")) {
    double price = Double.parseDouble(req.getParameter("price"));
    SalesOrder so = new SalesOrder(price, productId, quantity, id);
    so.setSeller(new Seller(who));
    engine.tell(so, ActorRef.noSender());

    resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(so));
    } else if (path.equals("/result2")) {
    String key = req.getParameter("id");
    Result r = results.get(key);
    if (r != null) {
    results.remove(key);
    resp.getWriter().write(r.data);
    } else {
    resp.getWriter().write("UNKNOWN OR PENDING");
    }
    } else {
    String msg = "Unknown command " + path;
    LOGGER.warn(msg);
    }

    }

    private static class Stats {
    int totalSalesPerMinute;
    }

    private static class Result {
    String data;
    long created;

    Result(String data) {
    this.data = data;
    this.created = System.currentTimeMillis();
    }
    }

    /**
    * using actors, we guarantee that only ever one thread accesses our trading
    * engine at any one time, and so we avoid having to synchronize!
    */
    private static class TradingEngineActor extends AbstractActor {

    private static final String RUN = "RUN";

    // STATE
    private TradingEngine engine = new TradingEngine(DELAY, TIMEOUT, (type,
    data) -> handle(type, data), true);

    public TradingEngineActor() throws NamingException {

    // INBOX
    receive(ReceiveBuilder
    .match(SalesOrder.class,
    so -> {
    // BEHAVIOUR (delegated to engine)
    engine.addSalesOrder(so.getSeller().getName(),
    so.getProductId(),
    so.getRemainingQuantity(),
    so.getPrice(), so.getId());
    })
    .match(PurchaseOrder.class,
    po -> {
    // BEHAVIOUR (delegated to engine)
    engine.addPurchaseOrder(
    po.getBuyer().getName(),
    po.getProductId(),
    po.getRemainingQuantity(), po.getId());
    })
    .match(String.class, s -> RUN.equals(s), command -> {
    engine.run();
    })
    .matchAny(
    o -> System.err
    .println("received unknown message: " + o))
    .build());
    }

    private void handle(EventType type, Object data) {
    event(type, data);
    if (type.equals(EventType.STOPPED)) {
    self().tell(RUN, ActorRef.noSender()); // start another trading
    // engine!
    }
    }
    }

    }
    48 changes: 48 additions & 0 deletions TradingEngineThread.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,48 @@
    package ch.maxant.tradingengine.model;

    import javax.naming.NamingException;

    import ch.maxant.tradingengine.model.TradingEngine.Listener;
    import ch.maxant.tradingengine.model.TradingEngine.MarketPrice;
    import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord;

    /**
    * a simple delegate which caches buyers and sellers, just like the node.js
    * child processes do.
    */
    public class TradingEngineThread extends Thread {

    private static int ID = 0;

    private final TradingEngine engine;

    public TradingEngineThread(long delay, long timeout, Listener listener)
    throws NamingException {
    super("engine-" + ID++);
    engine = new TradingEngine(delay, timeout, listener);
    }

    @Override
    public void run() {
    engine.run();
    }

    public PurchaseOrder addPurchaseOrder(String who, String productId,
    int quantity, int id) {

    return engine.addPurchaseOrder(who, productId, quantity, id);
    }

    public SalesOrder addSalesOrder(String who, String productId, int quantity,
    double price, int id) {
    return engine.addSalesOrder(who, productId, quantity, price, id);
    }

    public VolumeRecord getCurrentVolume(String productId) {
    return engine.getCurrentVolume(productId);
    }

    public MarketPrice getMarketPrice(String productId) {
    return engine.getCurrentMarketPrice(productId);
    }
    }