Files
api-duneawa/src/importer/importer.js
2026-05-10 20:19:07 +02:00

282 lines
6.4 KiB
JavaScript

const crypto = require("crypto");
const { config } = require("../config");
const {
DATASETS,
normalizeDatasetList,
normalizeLanguageList,
} = require("../datasets");
const { connectToMongo } = require("../db/client");
const { ensureIndexes } = require("../db/indexes");
const { fetchQuestlogDetail, fetchQuestlogPage } = require("./questlogClient");
const importStatus = {
running: false,
startedAt: null,
finishedAt: null,
error: null,
current: null,
totals: {},
};
const ITEM_DETAIL_CONCURRENCY = 6;
function stableJsonHash(value) {
return crypto.createHash("sha1").update(JSON.stringify(value)).digest("hex");
}
function extractSourceId(record) {
return String(
record?.compoundId ||
record?.id ||
record?._id ||
record?.slug ||
record?.name ||
record?.title ||
stableJsonHash(record),
);
}
function extractName(record) {
const value =
record?.name ||
record?.title ||
record?.displayName ||
record?.id ||
record?.compoundId ||
"";
return String(value).replace(/\s+/g, " ").trim();
}
function buildStoredDocument(dataset, language, record) {
const sourceId = extractSourceId(record);
const name = extractName(record);
const rawText = JSON.stringify(record);
return {
dataset: dataset.key,
language,
source: "questlog.gg",
sourceMethod: dataset.method,
sourceId,
name,
searchText: `${name} ${rawText}`,
raw: record,
importedAt: new Date(),
};
}
async function upsertRecords(db, dataset, language, records) {
if (records.length === 0) {
return { matched: 0, modified: 0, upserted: 0 };
}
const collection = db.collection(dataset.collection);
const operations = records.map((record) => {
const document = buildStoredDocument(dataset, language, record);
return {
updateOne: {
filter: { language, sourceId: document.sourceId },
update: { $set: document },
upsert: true,
},
};
});
const result = await collection.bulkWrite(operations, { ordered: false });
return {
matched: result.matchedCount || 0,
modified: result.modifiedCount || 0,
upserted: result.upsertedCount || 0,
};
}
async function mapWithConcurrency(values, limit, iteratee) {
const results = new Array(values.length);
let nextIndex = 0;
async function worker() {
while (nextIndex < values.length) {
const currentIndex = nextIndex;
nextIndex += 1;
results[currentIndex] = await iteratee(
values[currentIndex],
currentIndex,
);
}
}
const workerCount = Math.min(limit, values.length);
await Promise.all(Array.from({ length: workerCount }, worker));
return results;
}
function extractItemDetailId(record) {
if (record?.id) {
return String(record.id);
}
if (record?.compoundId) {
return String(record.compoundId).replace(/^item-/, "");
}
return undefined;
}
async function fetchItemDetailRecords(records, language) {
const detailDataset = DATASETS.items.detail;
return mapWithConcurrency(
records,
ITEM_DETAIL_CONCURRENCY,
async (record) => {
const id = extractItemDetailId(record);
if (!id) {
throw new Error(
`Could not determine Questlog item detail id for ${JSON.stringify(record)}`,
);
}
return fetchQuestlogDetail(detailDataset.method, id, language);
},
);
}
async function importItemDetails(db, language, page, records) {
const detailDataset = DATASETS.items.detail;
importStatus.current = {
dataset: detailDataset.key,
language,
page,
records: records.length,
};
const details = await fetchItemDetailRecords(records, language);
return upsertRecords(db, detailDataset, language, details);
}
function resetStatus() {
importStatus.running = true;
importStatus.startedAt = new Date().toISOString();
importStatus.finishedAt = null;
importStatus.error = null;
importStatus.current = null;
importStatus.totals = {};
}
function recordTotals(datasetKey, language, pageResult, recordsCount) {
const key = `${datasetKey}:${language}`;
const existing = importStatus.totals[key] || {
pages: 0,
records: 0,
matched: 0,
modified: 0,
upserted: 0,
};
existing.pages += 1;
existing.records += recordsCount;
existing.matched += pageResult.matched;
existing.modified += pageResult.modified;
existing.upserted += pageResult.upserted;
importStatus.totals[key] = existing;
}
async function importDatasetLanguage(db, dataset, language, maxPages) {
let page = 1;
while (true) {
importStatus.current = { dataset: dataset.key, language, page };
const payload = await fetchQuestlogPage(dataset, language, page);
if (payload.records.length === 0) {
break;
}
const pageResult = await upsertRecords(
db,
dataset,
language,
payload.records,
);
recordTotals(dataset.key, language, pageResult, payload.records.length);
if (dataset.detail) {
const detailResult = await importItemDetails(
db,
language,
page,
payload.records,
);
recordTotals(
dataset.detail.key,
language,
detailResult,
payload.records.length,
);
}
const reachedKnownEnd = payload.pageCount && page >= payload.pageCount;
const reachedConfiguredLimit = maxPages && page >= maxPages;
if (reachedKnownEnd || reachedConfiguredLimit) {
break;
}
page += 1;
}
}
async function runImport(options = {}) {
if (importStatus.running) {
const error = new Error("An import is already running");
error.status = 409;
throw error;
}
const datasetKeys = normalizeDatasetList(options.datasets);
const languages = normalizeLanguageList(options.languages);
const maxPages = options.maxPages || config.importer.maxPages;
resetStatus();
try {
const db = await connectToMongo();
await ensureIndexes(db);
for (const datasetKey of datasetKeys) {
const dataset = DATASETS[datasetKey];
for (const language of languages) {
await importDatasetLanguage(db, dataset, language, maxPages);
}
}
importStatus.finishedAt = new Date().toISOString();
importStatus.running = false;
importStatus.current = null;
return getImportStatus();
} catch (error) {
importStatus.finishedAt = new Date().toISOString();
importStatus.running = false;
importStatus.error = error.message;
throw error;
}
}
function getImportStatus() {
return {
running: importStatus.running,
startedAt: importStatus.startedAt,
finishedAt: importStatus.finishedAt,
error: importStatus.error,
current: importStatus.current,
totals: importStatus.totals,
};
}
function startImport(options = {}) {
const promise = runImport(options).catch(() => undefined);
return promise;
}
module.exports = {
getImportStatus,
runImport,
startImport,
};