init
This commit is contained in:
202
src/importer/importer.js
Normal file
202
src/importer/importer.js
Normal file
@@ -0,0 +1,202 @@
|
||||
const crypto = require("crypto");
|
||||
const { config } = require("../config");
|
||||
const {
|
||||
DATASETS,
|
||||
normalizeDatasetList,
|
||||
normalizeLanguageList,
|
||||
} = require("../datasets");
|
||||
const { connectToMongo } = require("../db/client");
|
||||
const { ensureIndexes } = require("../db/indexes");
|
||||
const { fetchQuestlogPage } = require("./questlogClient");
|
||||
|
||||
const importStatus = {
|
||||
running: false,
|
||||
startedAt: null,
|
||||
finishedAt: null,
|
||||
error: null,
|
||||
current: null,
|
||||
totals: {},
|
||||
};
|
||||
|
||||
function stableJsonHash(value) {
|
||||
return crypto.createHash("sha1").update(JSON.stringify(value)).digest("hex");
|
||||
}
|
||||
|
||||
function extractSourceId(record) {
|
||||
return String(
|
||||
record?.compoundId ||
|
||||
record?.id ||
|
||||
record?._id ||
|
||||
record?.slug ||
|
||||
record?.name ||
|
||||
record?.title ||
|
||||
stableJsonHash(record),
|
||||
);
|
||||
}
|
||||
|
||||
function extractName(record) {
|
||||
const value =
|
||||
record?.name ||
|
||||
record?.title ||
|
||||
record?.displayName ||
|
||||
record?.id ||
|
||||
record?.compoundId ||
|
||||
"";
|
||||
return String(value).replace(/\s+/g, " ").trim();
|
||||
}
|
||||
|
||||
function buildStoredDocument(dataset, language, record) {
|
||||
const sourceId = extractSourceId(record);
|
||||
const name = extractName(record);
|
||||
const rawText = JSON.stringify(record);
|
||||
|
||||
return {
|
||||
dataset: dataset.key,
|
||||
language,
|
||||
source: "questlog.gg",
|
||||
sourceMethod: dataset.method,
|
||||
sourceId,
|
||||
name,
|
||||
searchText: `${name} ${rawText}`,
|
||||
raw: record,
|
||||
importedAt: new Date(),
|
||||
};
|
||||
}
|
||||
|
||||
async function upsertRecords(db, dataset, language, records) {
|
||||
if (records.length === 0) {
|
||||
return { matched: 0, modified: 0, upserted: 0 };
|
||||
}
|
||||
|
||||
const collection = db.collection(dataset.collection);
|
||||
const operations = records.map((record) => {
|
||||
const document = buildStoredDocument(dataset, language, record);
|
||||
return {
|
||||
updateOne: {
|
||||
filter: { language, sourceId: document.sourceId },
|
||||
update: { $set: document },
|
||||
upsert: true,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const result = await collection.bulkWrite(operations, { ordered: false });
|
||||
return {
|
||||
matched: result.matchedCount || 0,
|
||||
modified: result.modifiedCount || 0,
|
||||
upserted: result.upsertedCount || 0,
|
||||
};
|
||||
}
|
||||
|
||||
function resetStatus() {
|
||||
importStatus.running = true;
|
||||
importStatus.startedAt = new Date().toISOString();
|
||||
importStatus.finishedAt = null;
|
||||
importStatus.error = null;
|
||||
importStatus.current = null;
|
||||
importStatus.totals = {};
|
||||
}
|
||||
|
||||
function recordTotals(datasetKey, language, pageResult, recordsCount) {
|
||||
const key = `${datasetKey}:${language}`;
|
||||
const existing = importStatus.totals[key] || {
|
||||
pages: 0,
|
||||
records: 0,
|
||||
matched: 0,
|
||||
modified: 0,
|
||||
upserted: 0,
|
||||
};
|
||||
existing.pages += 1;
|
||||
existing.records += recordsCount;
|
||||
existing.matched += pageResult.matched;
|
||||
existing.modified += pageResult.modified;
|
||||
existing.upserted += pageResult.upserted;
|
||||
importStatus.totals[key] = existing;
|
||||
}
|
||||
|
||||
async function importDatasetLanguage(db, dataset, language, maxPages) {
|
||||
let page = 1;
|
||||
|
||||
while (true) {
|
||||
importStatus.current = { dataset: dataset.key, language, page };
|
||||
const payload = await fetchQuestlogPage(dataset, language, page);
|
||||
|
||||
if (payload.records.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
const pageResult = await upsertRecords(
|
||||
db,
|
||||
dataset,
|
||||
language,
|
||||
payload.records,
|
||||
);
|
||||
recordTotals(dataset.key, language, pageResult, payload.records.length);
|
||||
|
||||
const reachedKnownEnd = payload.pageCount && page >= payload.pageCount;
|
||||
const reachedConfiguredLimit = maxPages && page >= maxPages;
|
||||
if (reachedKnownEnd || reachedConfiguredLimit) {
|
||||
break;
|
||||
}
|
||||
|
||||
page += 1;
|
||||
}
|
||||
}
|
||||
|
||||
async function runImport(options = {}) {
|
||||
if (importStatus.running) {
|
||||
const error = new Error("An import is already running");
|
||||
error.status = 409;
|
||||
throw error;
|
||||
}
|
||||
|
||||
const datasetKeys = normalizeDatasetList(options.datasets);
|
||||
const languages = normalizeLanguageList(options.languages);
|
||||
const maxPages = options.maxPages || config.importer.maxPages;
|
||||
|
||||
resetStatus();
|
||||
|
||||
try {
|
||||
const db = await connectToMongo();
|
||||
await ensureIndexes(db);
|
||||
|
||||
for (const datasetKey of datasetKeys) {
|
||||
const dataset = DATASETS[datasetKey];
|
||||
for (const language of languages) {
|
||||
await importDatasetLanguage(db, dataset, language, maxPages);
|
||||
}
|
||||
}
|
||||
|
||||
importStatus.finishedAt = new Date().toISOString();
|
||||
importStatus.running = false;
|
||||
importStatus.current = null;
|
||||
return getImportStatus();
|
||||
} catch (error) {
|
||||
importStatus.finishedAt = new Date().toISOString();
|
||||
importStatus.running = false;
|
||||
importStatus.error = error.message;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function getImportStatus() {
|
||||
return {
|
||||
running: importStatus.running,
|
||||
startedAt: importStatus.startedAt,
|
||||
finishedAt: importStatus.finishedAt,
|
||||
error: importStatus.error,
|
||||
current: importStatus.current,
|
||||
totals: importStatus.totals,
|
||||
};
|
||||
}
|
||||
|
||||
function startImport(options = {}) {
|
||||
const promise = runImport(options).catch(() => undefined);
|
||||
return promise;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getImportStatus,
|
||||
runImport,
|
||||
startImport,
|
||||
};
|
||||
Reference in New Issue
Block a user