From 4e4049d90db952f5d9dea46a6977a5ec0029b954 Mon Sep 17 00:00:00 2001 From: pang Date: Mon, 19 Aug 2019 10:41:17 +0200 Subject: [PATCH] Optimized estimation engine --- .../cyclops/consume/command/Forecast.java | 55 +++- .../consume/command/GlobalForecast.java | 254 ++---------------- 2 files changed, 62 insertions(+), 247 deletions(-) diff --git a/UDR/src/main/java/ch/icclab/cyclops/consume/command/Forecast.java b/UDR/src/main/java/ch/icclab/cyclops/consume/command/Forecast.java index e29461a..5bc06ab 100644 --- a/UDR/src/main/java/ch/icclab/cyclops/consume/command/Forecast.java +++ b/UDR/src/main/java/ch/icclab/cyclops/consume/command/Forecast.java @@ -34,7 +34,7 @@ public class Forecast extends Command{ private String target; private long forecastSize; - private class GenerateBill { + private static class GenerateBill { String command; Long time_from; Long time_to; @@ -48,15 +48,18 @@ private class GenerateBill { } @Override Status execute() { + return compute(account, target, forecastSize); + } + public static Status compute(String user, String model, long length){ Long time_from = System.currentTimeMillis(); - Long time_to = time_from + forecastSize * 86402000; + Long time_to = time_from + length * 86402000; Status status = new Status(); //Retrieve usage records DbAccess db = new DbAccess(); SelectQuery select = db.createSelectFrom(Usage.TABLE); //Filter by specified account - if(!(account.equals("global"))) { - select.addConditions(Usage.ACCOUNT_FIELD.eq(account)); + if(!(user.equals("global"))) { + select.addConditions(Usage.ACCOUNT_FIELD.eq(user)); } //Sort by newest first select.addOrderBy(Usage.TIME_FIELD.desc()); @@ -70,8 +73,13 @@ Status execute() { } // Generate forecast by usage type int generated = 0; + int accountsize = 1; + if(user.equals("global")){ + accountsize = countAccounts(usages); + } + for(String key:usage_map.keySet()){ - generated += generateForecastFromUsage(usage_map.get(key)); + generated += generateForecastFromUsage(usage_map.get(key), accountsize, user, model, length); } // Only continue if any forecast records were generated if(generated > 0) { @@ -90,14 +98,32 @@ Status execute() { } //Generate Bill estimates - GenerateBill generateBill = new GenerateBill(time_from, time_to, account + "-arima-" + target); + GenerateBill generateBill = new GenerateBill(time_from, time_to, user + "-arima-" + model); Messenger.publish(generateBill, "Billing"); - status.setSuccessful("Forecast estimation for account " + account + " complete"); + status.setSuccessful("Forecast estimation for account " + user + " complete"); } else{status.setServerError("Not enough records found to generate forecast");} return status; } - private int generateForecastFromUsage(List usages){ + private static int countAccounts(List usages){ + List accounts = new ArrayList<>(); + for(Usage usage:usages){ + if(!accounts.isEmpty()){ + int i = 0; + for (String account:accounts){ + if (account.equals(usage.getAccount()))i++; + } + if(i==0){ + accounts.add(usage.getAccount()); + } + } + else{ + accounts.add(usage.getAccount()); + } + } + return accounts.size(); + } + private static int generateForecastFromUsage(List usages, int num, String user, String model, long length){ if (usages.size()>0) { String metric = usages.get(0).getMetric(); @@ -111,17 +137,20 @@ private int generateForecastFromUsage(List usages){ metrics[i] = usages.get(i).getUsage(); } //Generate forecast with ARIMA model - ARIMAForecast forecast = new ARIMAForecast(); - double[] forecastData = forecast.getForecast(metrics, 9, 0, 9, 0, 0, 0, 0, (int)(long)forecastSize); + //ARIMAForecast forecast = new ARIMAForecast(); + //double[] forecastData = forecast.getForecast(metrics, 9, 0, 9, 0, 0, 0, 0, (int)(long)forecastSize*24*num); + double mean = Arrays.stream(metrics).average().orElse(Double.NaN); + double[] forecastData = new double[(int)(long)length*24*num]; + Arrays.fill(forecastData, mean); //Generate Usage records from forecast int k = 0; for (double entry : forecastData) { DbAccess dbn = new DbAccess(); InsertQuery insert = dbn.createInsertInto(Usage.TABLE); - insert.addValue(Usage.DATA_FIELD, String.format("{\"target\":\"%s\"}", target)); - insert.addValue(Usage.ACCOUNT_FIELD, account + "-arima-" + target); + insert.addValue(Usage.DATA_FIELD, String.format("{\"target\":\"%s\"}", model)); + insert.addValue(Usage.ACCOUNT_FIELD, user + "-arima-" + model); insert.addValue(Usage.METRIC_FIELD, metric); - insert.addValue(Usage.TIME_FIELD, System.currentTimeMillis() + k * 86400000); + insert.addValue(Usage.TIME_FIELD, System.currentTimeMillis() + k * 3600000); insert.addValue(Usage.USAGE_FIELD, entry); insert.addValue(Usage.UNIT_FIELD, unit); dbn.executeInsertStatement(insert); diff --git a/UDR/src/main/java/ch/icclab/cyclops/consume/command/GlobalForecast.java b/UDR/src/main/java/ch/icclab/cyclops/consume/command/GlobalForecast.java index 328f43e..1660cb1 100644 --- a/UDR/src/main/java/ch/icclab/cyclops/consume/command/GlobalForecast.java +++ b/UDR/src/main/java/ch/icclab/cyclops/consume/command/GlobalForecast.java @@ -21,6 +21,7 @@ import ch.icclab.cyclops.timeseries.DbAccess; import ch.icclab.cyclops.timeseries.forecast.ARIMAForecast; import ch.icclab.cyclops.util.loggers.TimeSeriesLogger; +import ch.icclab.cyclops.consume.command.Forecast; import org.jooq.InsertQuery; import org.jooq.SelectQuery; import java.util.*; @@ -29,7 +30,7 @@ * Created: 17/06/2019 * Description: Generate total revenue forecast command */ -public class GlobalForecast extends Command{ +public class GlobalForecast extends Command { //Name of the target pricing model private String target; //Number of days in the future that the forecast will predict @@ -41,6 +42,7 @@ private class GenerateBill { Long time_from; Long time_to; String request; + GenerateBill(Long time_from, Long time_to, String request) { this.command = getClass().getSimpleName(); this.time_from = time_from; @@ -48,6 +50,7 @@ private class GenerateBill { this.request = request; } } + @Override Status execute() { Long time_from = System.currentTimeMillis(); @@ -61,244 +64,27 @@ Status execute() { List usages; usages = db.fetchUsingSelectStatement(select, Usage.class); // Generate the forecast - int generated = generateForecastRecords(usages); - // Only continue if any forecast records were generated - if (generated > 0) { - //Generate virtual UDRs from the virtual Usage records - GenerateUDRs generateUDRs = new GenerateUDRs(); - generateUDRs.setTime_from(time_from); - generateUDRs.setTime_to(time_to); - generateUDRs.setCommand(generateUDRs.getClass().getSimpleName()); - assert Loader.getSettings() != null; - Messenger.publish(generateUDRs, Loader.getSettings().getPublisherCredentials().getRoutingKeyPublishUDRCommand()); - - // Wait for CDRs to be generated - for (String account : keysAsArray) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - //Generate Bill estimates - - GenerateBill generateBill = new GenerateBill(time_from, time_to, account + "-2D-arima-" + target); - Messenger.publish(generateBill, "Billing"); - status.setSuccessful("Forecast estimation complete"); - - } + List users = countAccounts(usages); + for(String user:users){ + status = Forecast.compute(user,target,forecastSize); } return status; } - - /** - * Generate usage and activity patterns and create forecast - * @param usages List of usage records retrieved from the historical record - * @return Number of forecast records generated - */ - private int generateForecastRecords(List usages){ - // Generate usage patterns - HashMap>> usage_patterns = generateUsagePatterns(usages); - // Generate daily activity pattern - HashMap daily_activity = generateActivityPattern(usages); - // Daily activity forecast - // Create a list of user counts - List forecastInput = new ArrayList<>(); - for (Map.Entry entry : daily_activity.entrySet()) forecastInput.add(entry.getValue()); - //Generate activity forecast - List activityForecast = generateActivityForecast(forecastInput); - // Generate daily Usage forecast - // Counter of forecasts generated - return generateTotalForecast(usage_patterns,activityForecast); - } - - /** - * Organize usage records recovered from DB by account and metric type - * @param usages Records retrieved from DB - * @return A map of usage patterns per account - */ - private HashMap>> generateUsagePatterns(List usages){ - // Group usages by user and type - HashMap>> usage_patterns = new HashMap<>(); - // For each usage record - for(Usage usage:usages){ - //if this account does not exist in the map - if(!usage_patterns.containsKey(usage.getAccount())){ - // Make a new usage map for this account and add the current usage - HashMap> new_usage = new HashMap<>(); - List temp = new_usage.computeIfAbsent(usage.getMetric(), k -> new ArrayList<>()); - temp.add(usage); - usage_patterns.put(usage.getAccount(),new_usage); - } - //if the account already exists in the map - else{ - // Retrieve the usage map for this user - HashMap> new_usage = usage_patterns.get(usage.getAccount()); - // Add this usage under the appropriate usage type - List temp = new_usage.computeIfAbsent(usage.getMetric(), k -> new ArrayList<>()); - temp.add(usage); - usage_patterns.put(usage.getAccount(),new_usage); - } - } - return usage_patterns; - } - - /** - * List the number of active accounts per day in the DB - * @param usages Records retrieved from DB - * @return A map of active accounts for each date of historical activity - */ - private HashMap generateActivityPattern(List usages){ - // Group usages by date - HashMap> daily_usage = groupByDate(usages); - // Find active accounts per date - HashMap> daily_accounts = accountsPerDate(daily_usage); - // Count active accounts per date - HashMap daily_activity = new HashMap<>(); - //for each date - for(Map.Entry> date : daily_accounts.entrySet()){ - // add the number of active accounts - daily_activity.put(date.getKey(),date.getValue().size()); - } - return daily_activity; - } - - /** - * Group records by date - * @param usages Records retrieved from DB - * @return Map of usages per date - */ - private HashMap> groupByDate(List usages){ - // Group usages by date - HashMap> daily_usage = new HashMap<>(); - // For each usage record - for(Usage usage : usages){ - // Get date of current record - Date usage_date = new Date(usage.getTime()); - // add record under the date - List temp = daily_usage.computeIfAbsent(usage_date, k -> new ArrayList<>()); - temp.add(usage); - } - return daily_usage; - } - - /** - * List accounts that were active each date - * @param daily_usage Usages grouped by date - * @return Map of accounts active per date - */ - private HashMap> accountsPerDate(HashMap> daily_usage){ - // Find active accounts per date - HashMap> daily_accounts = new HashMap<>(); - //for each date - for(Map.Entry> date : daily_usage.entrySet()){ - // Create entry for this date in new map - daily_accounts.put(date.getKey(),new ArrayList<>()); - // for each record under this date - for(Usage usage : date.getValue()){ - //if the account of this record had no other activity this date - if(daily_accounts.get(date.getKey()).isEmpty()){ - List temp = daily_accounts.get(date.getKey()); - temp.add(usage.getAccount()); + private List countAccounts(List usages) { + List accounts = new ArrayList<>(); + for (Usage usage : usages) { + if (!accounts.isEmpty()) { + int i = 0; + for (String account : accounts) { + if (account.equals(usage.getAccount())) i++; } - for(String str: daily_accounts.get(date.getKey())){ - if(!str.trim().contains(usage.getAccount())){ - // add this account to active accounts for this date - List temp = daily_accounts.get(date.getKey()); - temp.add(usage.getAccount()); - } - } - } - } - return daily_accounts; - } - /** - * Generate a forecast usage record from the current pattern - * @param usages Usages of the current user for the current metric type - * @param counter Number of days after today that the record will be timestamped with - * @return 1 or 0 to add to count of generated records - */ - private int generateForecastFromUsage(List usages, int counter, String account){ - if (usages.size()>0) { - - String metric = usages.get(0).getMetric(); - String unit = usages.get(0).getUnit(); - if (usages.size() < 28) { - TimeSeriesLogger.log("Insufficient data for metric " + metric); - return 0; - } - double[] metrics = new double[usages.size()]; - for (int i = 0; i < usages.size(); i++) { - metrics[i] = usages.get(i).getUsage(); - } - //Generate forecast with ARIMA model - ARIMAForecast forecast = new ARIMAForecast(); - double[] forecastData = forecast.getForecast(metrics, 9, 0, 9, 0, 0, 0, 0, 1); - //Generate Usage records from forecast - for (double entry : forecastData) { - DbAccess dbn = new DbAccess(); - InsertQuery insert = dbn.createInsertInto(Usage.TABLE); - insert.addValue(Usage.DATA_FIELD, String.format("{\"target\":\"%s\"}", target)); - insert.addValue(Usage.ACCOUNT_FIELD, account + "-2D-arima-" + target); - insert.addValue(Usage.METRIC_FIELD, metric); - insert.addValue(Usage.TIME_FIELD, System.currentTimeMillis() + counter * 86400000); - insert.addValue(Usage.USAGE_FIELD, entry); - insert.addValue(Usage.UNIT_FIELD, unit); - dbn.executeInsertStatement(insert); - } - return 1; - } - else{return 0;} - } - - /** - * Forecast the number of users that will be active for each of the days of the forecast duration - * @param input List of active users per day in the historical record - * @return List of active users per day as predicted by the model - */ - private List generateActivityForecast(List input){ - List output = new ArrayList<>(); - if(input.size() >= 28){ - double[] metrics = new double[input.size()]; - for (int i = 0; i < input.size(); i++) metrics[i] = input.get(i); - ARIMAForecast forecast = new ARIMAForecast(); - double[] forecastData = forecast.getForecast(metrics, 9, 0, 9, 0, 0, 0, 0, (int)(long)forecastSize); - for(double count : forecastData) output.add((int) count); - return output; - } - return output; - } - - /** - * Generate the usage forecast - * @param usage_patterns The usage pattern of every known user by metric type - * @param activityForecast The forecast number of users active for each day in the forecast duration - * @return Number of generated records - */ - private int generateTotalForecast(HashMap>> usage_patterns, List activityForecast){ - // Counter of forecasts generated - int generated = 0; - // List of accounts with usage patterns - keysAsArray = new ArrayList<>(usage_patterns.keySet()); - Random r = new Random(); - //For each day in the requested forecast length - for(int i=0;i> user_pattern = usage_patterns.get(key); - //for each usage type in the pattern - for(String type : user_pattern.keySet()){ - List usageForecastInput = user_pattern.get(type); - //generate a single usage record for the next day in the forecast - //based on the existing pattern for this type - // for i days in the future - generated += generateForecastFromUsage(usageForecastInput,i,key); + if (i == 0) { + accounts.add(usage.getAccount()); } + } else { + accounts.add(usage.getAccount()); } } - return generated; + return accounts; } -} +} \ No newline at end of file