Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save mohitkhanna/033c89da5e17c382cfea422bcae66160 to your computer and use it in GitHub Desktop.

Select an option

Save mohitkhanna/033c89da5e17c382cfea422bcae66160 to your computer and use it in GitHub Desktop.

Revisions

  1. @hboylan hboylan revised this gist Nov 1, 2018. 1 changed file with 9 additions and 13 deletions.
    22 changes: 9 additions & 13 deletions lambda-s3-read-write-by-line.js
    Original file line number Diff line number Diff line change
    @@ -61,8 +61,7 @@ exports.handler = function execute(event, context, callback) {

    // close stream on limit
    if (event.limit && event.limit <= totalLineCount) {
    readStream.close()
    return
    return readStream.close()
    }

    // process line
    @@ -75,21 +74,18 @@ exports.handler = function execute(event, context, callback) {
    })

    // clean up on close
    readStream.on('close', () => {
    readStream.on('close', async () => {

    // end write stream
    writeStream.end()

    // wait for upload
    uploadPromise
    .then(uploadResponse => {

    // return processing insights
    callback(null, {
    totalLineCount,
    uploadResponse
    })
    })
    .catch(callback)
    const uploadResponse = await uploadPromise

    // return processing insights
    callback(null, {
    totalLineCount,
    uploadResponse
    })
    })
    }
  2. @hboylan hboylan renamed this gist Nov 1, 2018. 1 changed file with 0 additions and 0 deletions.
  3. @hboylan hboylan created this gist Nov 1, 2018.
    95 changes: 95 additions & 0 deletions lambda-read-write-s3-by-line.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,95 @@
    const stream = require('stream')
    const readline = require('readline')
    const AWS = require('aws-sdk')
    const S3 = new AWS.S3()

    // read S3 file by line
    function createReadline(Bucket, Key) {

    // s3 read stream
    const input = S3
    .getObject({
    Bucket,
    Key
    })
    .createReadStream()

    // node readline with stream
    return readline
    .createInterface({
    input,
    terminal: false
    })
    }

    // write S3 file
    function createWriteStream(Bucket, Key) {
    const writeStream = new stream.PassThrough()
    const uploadPromise = S3
    .upload({
    Bucket,
    Key,
    Body: writeStream
    })
    .promise()
    return { writeStream, uploadPromise }
    }

    // perform processing on line
    function processLine(line) {
    // do something
    return line
    }

    // event.inputBucket: source file bucket
    // event.inputKey: source file key
    // event.outputBucket: target file bucket
    // event.outputKey: target file key
    // event.limit: maximum number of lines to read
    exports.handler = function execute(event, context, callback) {
    console.log(JSON.stringify(event, null, 2))
    var totalLineCount = 0

    // create input stream from S3
    const readStream = createReadline(event.inputBucket, event.inputKey)

    // create output stream to S3
    const { writeStream, uploadPromise } = createWriteStream(event.outputBucket, event.outputKey)

    // read each line
    readStream.on('line', line => {

    // close stream on limit
    if (event.limit && event.limit <= totalLineCount) {
    readStream.close()
    return
    }

    // process line
    else {
    line = processLine(line)
    writeStream.write(`${line}\n`)
    }

    totalLineCount++
    })

    // clean up on close
    readStream.on('close', () => {

    // end write stream
    writeStream.end()

    // wait for upload
    uploadPromise
    .then(uploadResponse => {

    // return processing insights
    callback(null, {
    totalLineCount,
    uploadResponse
    })
    })
    .catch(callback)
    })
    }