Skip to content

Instantly share code, notes, and snippets.

@vineboneto
Created April 4, 2024 05:24
Show Gist options
  • Select an option

  • Save vineboneto/d7a8f34709f39bf484779d77be338ddf to your computer and use it in GitHub Desktop.

Select an option

Save vineboneto/d7a8f34709f39bf484779d77be338ddf to your computer and use it in GitHub Desktop.
import posgtres from "postgres";
import fs from "fs";
import { Readable, Transform } from "stream";
const sql = posgtres({
host: "localhost",
user: "postgres",
password: "1234",
port: 5432,
});
async function dbtofile() {
console.time();
const readable = await sql`copy mytemp (id, content) to stdout`.readable();
const write = fs.createWriteStream("output.csv");
const transform = new Transform({
transform(chunk, endonding, callback) {
const [id, ...fields] = chunk.toString().trim().split(/\s+/);
const newData = `${id}, ${fields.join(" ")}\n`;
callback(null, newData);
},
highWaterMark: 1024, // Limit chunk size to 1KB
});
transform.pipe(write);
readable.pipe(transform);
await new Promise((resolve, reject) => {
write.on("finish", () => {
console.log("Escrita concluída.");
resolve(null);
});
write.on("error", (err) => {
console.error(err);
resolve(null);
});
});
transform.destroy();
readable.destroy();
write.destroy();
sql.end();
console.timeEnd();
}
async function filetodb() {
await sql`delete from mytemp_copy`;
const write = await sql`copy mytemp_copy (id, content) from stdin`.writable();
const readable = fs.createReadStream("output.csv", {
encoding: "utf8",
highWaterMark: 1024,
});
const transform = new Transform({
transform(chunk, encoding, callback) {
const lines = ((this.partialLine || "") + chunk).split("\n");
lines.forEach((line, index) => {
// Ignora a última linha se não estiver completa (não terminada com uma quebra de linha)
if (index === lines.length - 1 && !line.endsWith("\n")) {
this.partialLine = line; // Salva a parte da linha para o próximo chunk
} else {
const [id, content] = line.split(", ");
this.push(`${+id}\t${content.trim()}\n`);
}
});
callback();
},
});
readable.pipe(transform);
transform.pipe(write);
try {
await new Promise((resolve, reject) => {
readable.on("end", resolve);
readable.on("error", reject);
write.on("finish", resolve);
write.on("error", reject);
});
console.log(await sql`select count(*) from mytemp_copy`);
} finally {
readable.destroy();
sql.end();
}
}
await filetodb();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment