Source: utils/transform.js

/**
 * Utility functions for transforming and restructuring Monitor objects.
 * These internal functions operate on the underlying `meta` and `data` tables
 * and return plain `{ meta, data }` objects, enabling consistent post-processing.
 *
 * Internal functions:
 * - `internal_collapse()` – Collapses multiple time series into a single column using aggregation.
 * - `internal_combine()` – Merges two monitor objects, dropping duplicated IDs in the second.
 * - `internal_select()` – Subsets a monitor by selected deviceDeploymentIDs.
 * - `internal_filterByValue()` – Filters by a metadata field and matching value.
 * - `internal_dropEmpty()` – Removes time series columns with no valid data.
 * - `internal_trimDate()` – Trims time series to full local-time days.
 *
 * Helper functions:
 * - `arrayMean()` – Computes the mean of an array, skipping null/NaN/invalid values.
 * - `round1()` – Rounds all non-datetime columns in a data table to 1 decimal place.
 * - `parseDatetime()` – Parse a user-provided datetime into a Luxon DateTime.
 */

import * as aq from 'arquero';
const op = aq.op;

import { DateTime } from 'luxon';

import { arrayMean, round1, parseDatetime } from './helpers.js';

/**
 * Collapse a Monitor object into a single time series.
 *
 * Collapses data from all time series into a single time series using the
 * function provided in the `FUN` argument (typically 'mean'). The single time
 * series result will be located at the mean longitude and latitude.
 *
 * When `FUN === "quantile"`, the `FUN_arg` argument specifies the quantile
 * probability.
 *
 * Available function names are those defined at:
 * https://uwdata.github.io/arquero/api/op#aggregate-functions
 *
 * @param {Monitor} monitor - The Monitor instance to collapse.
 * @param {string} deviceID - The name of the resulting time series column.
 * @param {string} FUN - The aggregate function name (e.g. "mean", "sum", "quantile").
 * @param {number} FUN_arg - An optional argument for the aggregator (e.g. quantile prob).
 * @returns {{ meta: aq.Table, data: aq.Table }} A Monitor object with a single time series.
 */
export function internal_collapse(monitor, deviceID = "generatedID", FUN = "mean", FUN_arg = 0.8) {
  const meta = monitor.meta;
  const data = monitor.data;

  // ----- Create new_meta ---------------------------------------------------

  const longitude = arrayMean(meta.array("longitude"));
  const latitude = arrayMean(meta.array("latitude"));
  // TODO:  Could create new locationID based on geohash
  const locationID = "xxx";
  const deviceDeploymentID = `xxx_${deviceID}`;

  // Start with first row and override key fields
  let new_meta = meta.slice(0, 1).derive({
    locationID: aq.escape(locationID),
    locationName: aq.escape(deviceID),
    longitude: aq.escape(longitude),
    latitude: aq.escape(latitude),
    elevation: aq.escape(null),
    houseNumber: aq.escape(null),
    street: aq.escape(null),
    city: aq.escape(null),
    zip: aq.escape(null),
    deviceDeploymentID: aq.escape(deviceDeploymentID),
    deviceType: aq.escape(null),
    deploymentType: aq.escape(null),
  });

  // ----- Create new_data ---------------------------------------------------

  // NOTE:  Arquero provides no functionality for row-operations, nor for
  // NOTE:  transpose. So we have to perform the following operations:
  // NOTE:    - fold the data into a dataframe with timestamp and id columns
  // NOTE:    - pivot the data based on timestamp while summing data columns
  // NOTE:    - fold the result into a dataframe with timestamp and value columns

  const ids = monitor.getIDs();
  const datetime = monitor.getDatetime();

  let transformed = data
    .derive({ utcDatestamp: 'd => op.format_utcdate(d.datetime)' })
    .select(aq.not('datetime'));

  const datetimeColumns = transformed.array('utcDatestamp');

  // Build aggregation function string
  let valueExpression;
  if (FUN === 'count') {
    valueExpression = '() => op.count()';
  } else if (FUN === 'quantile') {
    valueExpression = `d => op.quantile(d.value, ${FUN_arg})`;
  } else {
    valueExpression = `d => op.${FUN}(d.value)`;
  }

  const new_data = transformed
    .fold(ids)
    .pivot(
      { key: 'd => d.utcDatestamp' },
      { value: valueExpression }
    )
    .fold(datetimeColumns)
    // NOTE:  We don't use op.parse_date() because we require luxon DateTime
    //.derive({ datetime: 'd => op.parse_date(d.key)' })
    .derive({
      datetime: aq.escape(d => DateTime.fromISO(d.key, { zone: 'utc' }))
    })
    .rename({ value: deviceID })
    .select(['datetime', deviceID]);

  return { meta: new_meta, data: round1(new_data) };
}

/**
 * Combines two Monitor objects by merging their metadata and time series data.
 * If any deviceDeploymentIDs in `monitorB` are already present in `monitorA`,
 * they will be dropped from `monitorB` before merging.
 *
 * Data are combined with 'join_full' to guarantee that all times from either
 * Monitor object will be retained.
 *
 * @param {Monitor} monitorA - The base Monitor instance.
 * @param {Monitor} monitorB - The Monitor instance to merge in.
 * @returns {{ meta: aq.Table, data: aq.Table }} A combined monitor object.
 */
export function internal_combine(monitorA, monitorB) {
  const idsA = new Set(monitorA.meta.array('deviceDeploymentID'));
  const idsB = monitorB.meta.array('deviceDeploymentID');

  // Identify overlapping and unique IDs
  const overlappingIDs = idsB.filter(id => idsA.has(id));
  const uniqueIDs = idsB.filter(id => !idsA.has(id));

  // Filter monitorB's meta and data to include only unique IDs
  const metaB = monitorB.meta
    .params({ ids: uniqueIDs })
    .filter('op.includes(ids, d.deviceDeploymentID)');

  const dataB = monitorB.data.select(['datetime', ...uniqueIDs]);

  // Combine everything
  const combinedMeta = monitorA.meta.concat(metaB);
  const combinedData = monitorA.data
    .join_full(dataB, "datetime")
    .orderby("datetime");

  return { meta: combinedMeta, data: round1(combinedData) };
}


/**
 * Subsets and reorders time series columns and corresponding metadata
 * for the specified deviceDeploymentIDs.
 *
 * Ensures that the returned `meta` rows appear in the same order as `ids`,
 * and that all specified columns are included in the `data` table.
 *
 * @param {Monitor} monitor - The Monitor instance containing metadata and data.
 * @param {string[]} ids - An array of deviceDeploymentIDs to select and order.
 * @returns {{ meta: aq.Table, data: aq.Table }} A subset of the monitor with selected columns.
 *
 * @throws {Error} If `ids` is not a non-empty array.
 */
export function internal_select(monitor, ids) {
  // Normalize to array if a single string is passed
  if (typeof ids === 'string') {
    ids = [ids];
  }

  if (!Array.isArray(ids) || ids.length === 0) {
    throw new Error('ids must be a non-empty string or array of deviceDeploymentIDs');
  }

  if (new Set(ids).size !== ids.length) {
    throw new Error('Duplicate deviceDeploymentID values are not allowed in select()');
  }

  // Reorder meta rows to match the order of `ids`
  const metaRows = ids.map(id => {
    const row = monitor.meta.objects().find(r => r.deviceDeploymentID === id);
    if (!row) {
      throw new Error(`deviceDeploymentID '${id}' not found in metadata`);
    }
    return row;
  });
  const meta = aq.from(metaRows);

  // Subset and reorder columns in the data table
  const data = monitor.data.select(['datetime', ...ids]);

  return { meta: meta, data: round1(data) };
}

/**
 * Filters a monitor object to include only records where a given metadata field equals the specified value.
 *
 * @param {Monitor} monitor - The Monitor instance containing metadata and data.
 * @param {string} columnName - Name of the metadata column to filter on.
 * @param {string|number} value - Value to match in the specified column.
 * @returns {{ meta: aq.Table, data: aq.Table }} A filtered monitor object.
 *
 * @throws {Error} If the specified column does not exist in monitor.meta.
 */
export function internal_filterByValue(monitor, columnName, value) {
  if (!monitor.meta.columnNames().includes(columnName)) {
    throw new Error(`Column '${columnName}' not found in metadata`);
  }

  const colType = typeof monitor.meta.get(columnName);
  let filterExpression;

  if (colType === 'number') {
    const parsedValue = parseFloat(value);
    if (isNaN(parsedValue)) {
      throw new Error(`Value '${value}' could not be parsed as a number`);
    }
    filterExpression = `d => op.equal(d.${columnName}, ${parsedValue})`;
  } else if (colType === 'string') {
    const escaped = value.toString().replace(/'/g, "\\'");
    filterExpression = `d => op.equal(d.${columnName}, '${escaped}')`;
  } else {
    throw new Error(`Unsupported column type for filtering: ${colType}`);
  }

  const meta = monitor.meta.filter(filterExpression);
  const ids = meta.array('deviceDeploymentID');
  const data = monitor.data.select(['datetime', ...ids]);

  return { meta: meta, data: round1(data) };
}

/**
 * Drops time series from the monitor that contain only missing values.
 *
 * A value is considered missing if it is null, undefined, NaN, or an invalid string (e.g. 'NA').
 * The resulting monitor object includes only the deviceDeploymentIDs with at least one valid observation.
 *
 * @param {Monitor} monitor - The Monitor instance containing metadata and data.
 * @returns {{ meta: aq.Table, data: aq.Table }} A new monitor object with empty time series removed.
 */
export function internal_dropEmpty(monitor) {
  const data = monitor.data;
  const ids = data.columnNames().filter(c => c !== 'datetime');

  // Count valid (non-null, non-NaN, non-'NA') values for each time series column
  const countRow = data
    // arquero pattern to compute column-wise aggregations
    .rollup(Object.fromEntries(ids.map(id => [id, "d => op.valid(d['" + id + "'])"])))
    .object(0); // Get the single row as an object


  // Keep only the IDs with at least one valid value
  const validIDs = Object.entries(countRow)
    .filter(([_, count]) => count > 0)
    .map(([id]) => id);

  const filteredData = data.select(['datetime', ...validIDs]);

  const filteredMeta = monitor.meta
    .params({ ids: validIDs })
    .filter((d, $) => op.includes($.ids, d.deviceDeploymentID));

  return { meta: filteredMeta, data: round1(filteredData) };
}

/**
 * Trims time-series data to full local-time days (00:00–23:00),
 * and optionally removes full days with no data at the start or end.
 *
 * @param {Monitor} monitor - The Monitor instance with datetime-sorted, hourly-interval data.
 * @param {string} timezone - An IANA timezone string (e.g., "America/New_York").
 * @param {boolean} [trimEmptyDays=true] - Whether to remove fully-missing days at edges.
 * @returns {{ meta: aq.Table, data: aq.Table }} A subset of the monitor with trimmed data.
 *
 * @throws {Error} If the datetime column is missing, empty, or timezone is invalid.
 */
export function internal_trimDate(monitor, timezone, trimEmptyDays = true) {
  const datetime = monitor.data.array('datetime');
  if (!datetime || datetime.length === 0) {
    throw new Error('No datetime values found in monitor.data');
  }

  // Validate timezone
  const test = datetime[0].setZone(timezone);
  if (!test.isValid || test.zoneName !== timezone) {
    throw new Error(`Invalid or unrecognized timezone: '${timezone}'`);
  }

  // Convert first and last timestamps to local time
  const startLocal = datetime[0].setZone(timezone);
  const endLocal = datetime[datetime.length - 1].setZone(timezone);

  // Compute number of hours to trim at start and end
  const startTrim = startLocal.hour === 0 ? 0 : 24 - startLocal.hour;
  const endTrim = endLocal.hour === 23 ? 0 : endLocal.hour + 1;

  let start = startTrim;
  let end = datetime.length - endTrim;

  if (trimEmptyDays) {
    const dataCols = monitor.data.columnNames().filter(c => c !== 'datetime');

    const firstDay = monitor.data.slice(start, start + 24);
    const allInvalidStart = dataCols.every(col =>
      firstDay.array(col).every(v => v == null)
    );
    if (allInvalidStart) start += 24;

    const lastDay = monitor.data.slice(end - 24, end);
    const allInvalidEnd = dataCols.every(col =>
      lastDay.array(col).every(v => v == null)
    );
    if (allInvalidEnd) end -= 24;
  }

  const trimmed = monitor.data.slice(start, end);
  return { meta: monitor.meta, data: round1(trimmed) };
}

/**
 * Filter a Monitor's time-series data to an explicit datetime range.
 *
 * Assumptions:
 * - `monitor.data.datetime` contains Luxon DateTime objects in UTC.
 * - Rows in `monitor.data` are sorted in ascending datetime order.
 *
 * Inputs:
 * - `startdate` and `enddate` may be:
 *    * Luxon DateTime objects (any zone) – `timezone` is optional.
 *    * Strings or native Date objects – a valid IANA `timezone` is required.
 *
 * Behavior:
 * - For string/Date inputs, `parseDatetime()` interprets them in `timezone`.
 * - For date-only strings (e.g. "2025-02-10"), `enddate` is promoted to
 *   the *end of that local day* so that whole-day ranges are inclusive.
 * - Both parsed datetimes are converted to UTC and used to find the
 *   inclusive index range within `monitor.data`.
 *
 * Note:
 * - This function does NOT construct a new Monitor instance. It returns
 *   plain `{ meta, data }` tables; the public `Monitor#filterDatetime()`
 *   method is responsible for wrapping the result into a Monitor and
 *   validating it.
 *
 * @param {Monitor} monitor - The source monitor object.
 * @param {DateTime|string|Date} startdate - Start of the range.
 * @param {DateTime|string|Date} enddate   - End of the range.
 * @param {string} [timezone] - IANA timezone when using string/Date inputs.
 * @returns {{ meta: aq.Table, data: aq.Table }} Subset of the monitor.
 *
 * @throws {Error} If datetime column is missing/empty, inputs cannot be parsed,
 *                 timezone is missing when required, or the range is inverted.
 */
export function internal_filterDatetime(monitor, startdate, enddate, timezone) {
  const datetime = monitor.data.array('datetime');

  if (!datetime || datetime.length === 0) {
    throw new Error('No datetime values found in monitor.data');
  }

  // Use shared helper to normalize inputs to Luxon DateTime in the
  // appropriate zone. `isEnd = true` promotes date-only strings to
  // end-of-day so whole-day ranges are inclusive.
  const startDT = parseDatetime(startdate, timezone, false);
  const endDT   = parseDatetime(enddate, timezone, true);

  // Convert to UTC for comparison with monitor.data.datetime (assumed UTC).
  const startUtc = startDT.toUTC();
  const endUtc   = endDT.toUTC();

  if (endUtc < startUtc) {
    throw new Error('enddate must be greater than or equal to startdate.');
  }

  const len = datetime.length;
  const startMs = startUtc.toMillis();
  const endMs   = endUtc.toMillis();

  // Precompute millisecond values once
  const times = new Array(len);
  for (let i = 0; i < len; i++) {
    times[i] = datetime[i].toMillis();
  }

  // Optimized binary search to find bounds in O(log n)

  // First index i where times[i] >= startMs
  const findFirstGE = (arr, target) => {
    let lo = 0;
    let hi = arr.length; // exclusive
    while (lo < hi) {
      const mid = (lo + hi) >> 1;
      if (arr[mid] < target) {
        lo = mid + 1;
      } else {
        hi = mid;
      }
    }
    return lo;
  };

  // Last index i where times[i] <= endMs
  const findLastLE = (arr, target) => {
    let lo = 0;
    let hi = arr.length; // exclusive
    while (lo < hi) {
      const mid = (lo + hi) >> 1;
      if (arr[mid] <= target) {
        lo = mid + 1;
      } else {
        hi = mid;
      }
    }
    return lo - 1; // last <= target
  };

  const startIdx = findFirstGE(times, startMs);
  const endIdx   = findLastLE(times, endMs);

  // If no overlap, return an empty data table with the same schema.
  if (startIdx >= len || endIdx < 0 || startIdx > endIdx) {
    return {
      meta: monitor.meta,
      data: monitor.data.slice(0, 0)
    };
  }

  // Final subsetting step: slice rows by index (Arquero's slice end is exclusive).
  const subset = monitor.data.slice(startIdx, endIdx + 1);

  return { meta: monitor.meta, data: subset };
}