var trumpet = require('trumpet') var { through, duplex, concat, pipe } = require('mississippi') var mimeTypes = { 'application/atom': 'atom', 'application/atom+xml': 'atom', 'text/atom': 'atom', 'text/atom+xml': 'atom', 'application/rss': 'rss', 'application/rss+xml': 'rss', 'text/rss': 'rss', 'text/rss+xml': 'rss', 'application/rdf': 'rdf', 'application/rdf+xml': 'rdf', 'text/rdf': 'rdf', 'text/rdf+xml': 'rdf', 'application/feed+json': 'json-feed', 'application/json': 'json-feed' } module.exports = function find (f) { var tr = trumpet() var snk = through.obj((data, _, done) => { try { done(null, f(data)) } catch (error) { done(error) } }) var i = 0 var mid = through.obj() mid.setMaxListeners(0) mid.on('pipe', () => (i += 1)) mid.on('unpipe', () => { if ((i -= 1) === 0 && tr._readableState.ended) mid.end() }) tr.once('end', () => { if (i === 0 && !mid._readableState.ended) mid.end() }) pipe( mid, through.obj((data, _, done) => { if (mimeTypes[data.type] == null) return done() done(null, data) }), concat(list => snk.end(list)), error => (error != null) && snk.emit('error', error) ) tr.selectAll('link', link => { var data = {} var src = through.obj() link.getAttribute('href', href => (data.href = href)) link.getAttribute('rel', rel => (data.rel = rel)) link.getAttribute('type', type => (data.type = type)) link.getAttribute('title', title => (data.title = title)) pipe( link.createReadStream(), concat(() => src.end(data)), error => { if (error != null) mid.emit('error', error) // else src.end(data) } ) src.pipe(mid, { end: false }) }) return duplex.obj(tr, snk) }