/**
* Utility functions for transforming and restructuring Monitor objects.
* These internal functions operate on the underlying `meta` and `data` tables
* and return plain `{ meta, data }` objects, enabling consistent post-processing.
*
* Internal functions:
* - `internal_collapse()` – Collapses multiple time series into a single column using aggregation.
* - `internal_combine()` – Merges two monitor objects, dropping duplicated IDs in the second.
* - `internal_select()` – Subsets a monitor by selected deviceDeploymentIDs.
* - `internal_filterByValue()` – Filters by a metadata field and matching value.
* - `internal_dropEmpty()` – Removes time series columns with no valid data.
* - `internal_trimDate()` – Trims time series to full local-time days.
*
* Helper functions:
* - `arrayMean()` – Computes the mean of an array, skipping null/NaN/invalid values.
* - `round1()` – Rounds all non-datetime columns in a data table to 1 decimal place.
* - `parseDatetime()` – Parse a user-provided datetime into a Luxon DateTime.
*/
import * as aq from 'arquero';
const op = aq.op;
import { DateTime } from 'luxon';
import { arrayMean, round1, parseDatetime } from './helpers.js';
/**
* Collapse a Monitor object into a single time series.
*
* Collapses data from all time series into a single time series using the
* function provided in the `FUN` argument (typically 'mean'). The single time
* series result will be located at the mean longitude and latitude.
*
* When `FUN === "quantile"`, the `FUN_arg` argument specifies the quantile
* probability.
*
* Available function names are those defined at:
* https://uwdata.github.io/arquero/api/op#aggregate-functions
*
* @param {Monitor} monitor - The Monitor instance to collapse.
* @param {string} deviceID - The name of the resulting time series column.
* @param {string} FUN - The aggregate function name (e.g. "mean", "sum", "quantile").
* @param {number} FUN_arg - An optional argument for the aggregator (e.g. quantile prob).
* @returns {{ meta: aq.Table, data: aq.Table }} A Monitor object with a single time series.
*/
export function internal_collapse(monitor, deviceID = "generatedID", FUN = "mean", FUN_arg = 0.8) {
const meta = monitor.meta;
const data = monitor.data;
// ----- Create new_meta ---------------------------------------------------
const longitude = arrayMean(meta.array("longitude"));
const latitude = arrayMean(meta.array("latitude"));
// TODO: Could create new locationID based on geohash
const locationID = "xxx";
const deviceDeploymentID = `xxx_${deviceID}`;
// Start with first row and override key fields
let new_meta = meta.slice(0, 1).derive({
locationID: aq.escape(locationID),
locationName: aq.escape(deviceID),
longitude: aq.escape(longitude),
latitude: aq.escape(latitude),
elevation: aq.escape(null),
houseNumber: aq.escape(null),
street: aq.escape(null),
city: aq.escape(null),
zip: aq.escape(null),
deviceDeploymentID: aq.escape(deviceDeploymentID),
deviceType: aq.escape(null),
deploymentType: aq.escape(null),
});
// ----- Create new_data ---------------------------------------------------
// NOTE: Arquero provides no functionality for row-operations, nor for
// NOTE: transpose. So we have to perform the following operations:
// NOTE: - fold the data into a dataframe with timestamp and id columns
// NOTE: - pivot the data based on timestamp while summing data columns
// NOTE: - fold the result into a dataframe with timestamp and value columns
const ids = monitor.getIDs();
const datetime = monitor.getDatetime();
let transformed = data
.derive({ utcDatestamp: 'd => op.format_utcdate(d.datetime)' })
.select(aq.not('datetime'));
const datetimeColumns = transformed.array('utcDatestamp');
// Build aggregation function string
let valueExpression;
if (FUN === 'count') {
valueExpression = '() => op.count()';
} else if (FUN === 'quantile') {
valueExpression = `d => op.quantile(d.value, ${FUN_arg})`;
} else {
valueExpression = `d => op.${FUN}(d.value)`;
}
const new_data = transformed
.fold(ids)
.pivot(
{ key: 'd => d.utcDatestamp' },
{ value: valueExpression }
)
.fold(datetimeColumns)
// NOTE: We don't use op.parse_date() because we require luxon DateTime
//.derive({ datetime: 'd => op.parse_date(d.key)' })
.derive({
datetime: aq.escape(d => DateTime.fromISO(d.key, { zone: 'utc' }))
})
.rename({ value: deviceID })
.select(['datetime', deviceID]);
return { meta: new_meta, data: round1(new_data) };
}
/**
* Combines two Monitor objects by merging their metadata and time series data.
* If any deviceDeploymentIDs in `monitorB` are already present in `monitorA`,
* they will be dropped from `monitorB` before merging.
*
* Data are combined with 'join_full' to guarantee that all times from either
* Monitor object will be retained.
*
* @param {Monitor} monitorA - The base Monitor instance.
* @param {Monitor} monitorB - The Monitor instance to merge in.
* @returns {{ meta: aq.Table, data: aq.Table }} A combined monitor object.
*/
export function internal_combine(monitorA, monitorB) {
const idsA = new Set(monitorA.meta.array('deviceDeploymentID'));
const idsB = monitorB.meta.array('deviceDeploymentID');
// Identify overlapping and unique IDs
const overlappingIDs = idsB.filter(id => idsA.has(id));
const uniqueIDs = idsB.filter(id => !idsA.has(id));
// Filter monitorB's meta and data to include only unique IDs
const metaB = monitorB.meta
.params({ ids: uniqueIDs })
.filter('op.includes(ids, d.deviceDeploymentID)');
const dataB = monitorB.data.select(['datetime', ...uniqueIDs]);
// Combine everything
const combinedMeta = monitorA.meta.concat(metaB);
const combinedData = monitorA.data
.join_full(dataB, "datetime")
.orderby("datetime");
return { meta: combinedMeta, data: round1(combinedData) };
}
/**
* Subsets and reorders time series columns and corresponding metadata
* for the specified deviceDeploymentIDs.
*
* Ensures that the returned `meta` rows appear in the same order as `ids`,
* and that all specified columns are included in the `data` table.
*
* @param {Monitor} monitor - The Monitor instance containing metadata and data.
* @param {string[]} ids - An array of deviceDeploymentIDs to select and order.
* @returns {{ meta: aq.Table, data: aq.Table }} A subset of the monitor with selected columns.
*
* @throws {Error} If `ids` is not a non-empty array.
*/
export function internal_select(monitor, ids) {
// Normalize to array if a single string is passed
if (typeof ids === 'string') {
ids = [ids];
}
if (!Array.isArray(ids) || ids.length === 0) {
throw new Error('ids must be a non-empty string or array of deviceDeploymentIDs');
}
if (new Set(ids).size !== ids.length) {
throw new Error('Duplicate deviceDeploymentID values are not allowed in select()');
}
// Reorder meta rows to match the order of `ids`
const metaRows = ids.map(id => {
const row = monitor.meta.objects().find(r => r.deviceDeploymentID === id);
if (!row) {
throw new Error(`deviceDeploymentID '${id}' not found in metadata`);
}
return row;
});
const meta = aq.from(metaRows);
// Subset and reorder columns in the data table
const data = monitor.data.select(['datetime', ...ids]);
return { meta: meta, data: round1(data) };
}
/**
* Filters a monitor object to include only records where a given metadata field equals the specified value.
*
* @param {Monitor} monitor - The Monitor instance containing metadata and data.
* @param {string} columnName - Name of the metadata column to filter on.
* @param {string|number} value - Value to match in the specified column.
* @returns {{ meta: aq.Table, data: aq.Table }} A filtered monitor object.
*
* @throws {Error} If the specified column does not exist in monitor.meta.
*/
export function internal_filterByValue(monitor, columnName, value) {
if (!monitor.meta.columnNames().includes(columnName)) {
throw new Error(`Column '${columnName}' not found in metadata`);
}
const colType = typeof monitor.meta.get(columnName);
let filterExpression;
if (colType === 'number') {
const parsedValue = parseFloat(value);
if (isNaN(parsedValue)) {
throw new Error(`Value '${value}' could not be parsed as a number`);
}
filterExpression = `d => op.equal(d.${columnName}, ${parsedValue})`;
} else if (colType === 'string') {
const escaped = value.toString().replace(/'/g, "\\'");
filterExpression = `d => op.equal(d.${columnName}, '${escaped}')`;
} else {
throw new Error(`Unsupported column type for filtering: ${colType}`);
}
const meta = monitor.meta.filter(filterExpression);
const ids = meta.array('deviceDeploymentID');
const data = monitor.data.select(['datetime', ...ids]);
return { meta: meta, data: round1(data) };
}
/**
* Drops time series from the monitor that contain only missing values.
*
* A value is considered missing if it is null, undefined, NaN, or an invalid string (e.g. 'NA').
* The resulting monitor object includes only the deviceDeploymentIDs with at least one valid observation.
*
* @param {Monitor} monitor - The Monitor instance containing metadata and data.
* @returns {{ meta: aq.Table, data: aq.Table }} A new monitor object with empty time series removed.
*/
export function internal_dropEmpty(monitor) {
const data = monitor.data;
const ids = data.columnNames().filter(c => c !== 'datetime');
// Count valid (non-null, non-NaN, non-'NA') values for each time series column
const countRow = data
// arquero pattern to compute column-wise aggregations
.rollup(Object.fromEntries(ids.map(id => [id, "d => op.valid(d['" + id + "'])"])))
.object(0); // Get the single row as an object
// Keep only the IDs with at least one valid value
const validIDs = Object.entries(countRow)
.filter(([_, count]) => count > 0)
.map(([id]) => id);
const filteredData = data.select(['datetime', ...validIDs]);
const filteredMeta = monitor.meta
.params({ ids: validIDs })
.filter((d, $) => op.includes($.ids, d.deviceDeploymentID));
return { meta: filteredMeta, data: round1(filteredData) };
}
/**
* Trims time-series data to full local-time days (00:00–23:00),
* and optionally removes full days with no data at the start or end.
*
* @param {Monitor} monitor - The Monitor instance with datetime-sorted, hourly-interval data.
* @param {string} timezone - An IANA timezone string (e.g., "America/New_York").
* @param {boolean} [trimEmptyDays=true] - Whether to remove fully-missing days at edges.
* @returns {{ meta: aq.Table, data: aq.Table }} A subset of the monitor with trimmed data.
*
* @throws {Error} If the datetime column is missing, empty, or timezone is invalid.
*/
export function internal_trimDate(monitor, timezone, trimEmptyDays = true) {
const datetime = monitor.data.array('datetime');
if (!datetime || datetime.length === 0) {
throw new Error('No datetime values found in monitor.data');
}
// Validate timezone
const test = datetime[0].setZone(timezone);
if (!test.isValid || test.zoneName !== timezone) {
throw new Error(`Invalid or unrecognized timezone: '${timezone}'`);
}
// Convert first and last timestamps to local time
const startLocal = datetime[0].setZone(timezone);
const endLocal = datetime[datetime.length - 1].setZone(timezone);
// Compute number of hours to trim at start and end
const startTrim = startLocal.hour === 0 ? 0 : 24 - startLocal.hour;
const endTrim = endLocal.hour === 23 ? 0 : endLocal.hour + 1;
let start = startTrim;
let end = datetime.length - endTrim;
if (trimEmptyDays) {
const dataCols = monitor.data.columnNames().filter(c => c !== 'datetime');
const firstDay = monitor.data.slice(start, start + 24);
const allInvalidStart = dataCols.every(col =>
firstDay.array(col).every(v => v == null)
);
if (allInvalidStart) start += 24;
const lastDay = monitor.data.slice(end - 24, end);
const allInvalidEnd = dataCols.every(col =>
lastDay.array(col).every(v => v == null)
);
if (allInvalidEnd) end -= 24;
}
const trimmed = monitor.data.slice(start, end);
return { meta: monitor.meta, data: round1(trimmed) };
}
/**
* Filter a Monitor's time-series data to an explicit datetime range.
*
* Assumptions:
* - `monitor.data.datetime` contains Luxon DateTime objects in UTC.
* - Rows in `monitor.data` are sorted in ascending datetime order.
*
* Inputs:
* - `startdate` and `enddate` may be:
* * Luxon DateTime objects (any zone) – `timezone` is optional.
* * Strings or native Date objects – a valid IANA `timezone` is required.
*
* Behavior:
* - For string/Date inputs, `parseDatetime()` interprets them in `timezone`.
* - For date-only strings (e.g. "2025-02-10"), `enddate` is promoted to
* the *end of that local day* so that whole-day ranges are inclusive.
* - Both parsed datetimes are converted to UTC and used to find the
* inclusive index range within `monitor.data`.
*
* Note:
* - This function does NOT construct a new Monitor instance. It returns
* plain `{ meta, data }` tables; the public `Monitor#filterDatetime()`
* method is responsible for wrapping the result into a Monitor and
* validating it.
*
* @param {Monitor} monitor - The source monitor object.
* @param {DateTime|string|Date} startdate - Start of the range.
* @param {DateTime|string|Date} enddate - End of the range.
* @param {string} [timezone] - IANA timezone when using string/Date inputs.
* @returns {{ meta: aq.Table, data: aq.Table }} Subset of the monitor.
*
* @throws {Error} If datetime column is missing/empty, inputs cannot be parsed,
* timezone is missing when required, or the range is inverted.
*/
export function internal_filterDatetime(monitor, startdate, enddate, timezone) {
const datetime = monitor.data.array('datetime');
if (!datetime || datetime.length === 0) {
throw new Error('No datetime values found in monitor.data');
}
// Use shared helper to normalize inputs to Luxon DateTime in the
// appropriate zone. `isEnd = true` promotes date-only strings to
// end-of-day so whole-day ranges are inclusive.
const startDT = parseDatetime(startdate, timezone, false);
const endDT = parseDatetime(enddate, timezone, true);
// Convert to UTC for comparison with monitor.data.datetime (assumed UTC).
const startUtc = startDT.toUTC();
const endUtc = endDT.toUTC();
if (endUtc < startUtc) {
throw new Error('enddate must be greater than or equal to startdate.');
}
const len = datetime.length;
const startMs = startUtc.toMillis();
const endMs = endUtc.toMillis();
// Precompute millisecond values once
const times = new Array(len);
for (let i = 0; i < len; i++) {
times[i] = datetime[i].toMillis();
}
// Optimized binary search to find bounds in O(log n)
// First index i where times[i] >= startMs
const findFirstGE = (arr, target) => {
let lo = 0;
let hi = arr.length; // exclusive
while (lo < hi) {
const mid = (lo + hi) >> 1;
if (arr[mid] < target) {
lo = mid + 1;
} else {
hi = mid;
}
}
return lo;
};
// Last index i where times[i] <= endMs
const findLastLE = (arr, target) => {
let lo = 0;
let hi = arr.length; // exclusive
while (lo < hi) {
const mid = (lo + hi) >> 1;
if (arr[mid] <= target) {
lo = mid + 1;
} else {
hi = mid;
}
}
return lo - 1; // last <= target
};
const startIdx = findFirstGE(times, startMs);
const endIdx = findLastLE(times, endMs);
// If no overlap, return an empty data table with the same schema.
if (startIdx >= len || endIdx < 0 || startIdx > endIdx) {
return {
meta: monitor.meta,
data: monitor.data.slice(0, 0)
};
}
// Final subsetting step: slice rows by index (Arquero's slice end is exclusive).
const subset = monitor.data.slice(startIdx, endIdx + 1);
return { meta: monitor.meta, data: subset };
}