node.js - RxJs Basing a Filter on Another Observable -


i have list of words pronunciation data in text file. have user enter word, , have program check see if have data on word in file. i'd in rxjs, new to.

the code below closest can want. within main stream have filter call creates dependent stream, called 'checkstream'. don't understand how use results of dependent stream in filter method of main stream. filter method fails, still data screen console logging it.

if there data in text file word, checkstream end being observable containing data want retrieve , show user. want somehow pipe data down consumer of main stream, don't understand how that.

i appreciate can provide. intuition useful.

var rx = require('rx'); var rxnode = require('rx-node'); var fs = require('fs'); var split = require('split');      rxnode.fromreadablestream(process.stdin)         .map( (inputdata) => {             var inputword = inputdata.tostring().trim();             return inputword;         })         .map( (inputword) => {              var checkstream = fs.createreadstream('./dict.txt');              rxnode.fromreadablestream(checkstream.pipe(split()))                 .map( (linefromfile) => {                     return json.parse(linefromfile);                 })                 .filter((parseddatatocheck) => {                     return parseddatatocheck.word.tolowercase().trim() === inputword; })                 .subscribe((datathatmatches) => { console.log(datathatmatches) });              return datatoreturn;         })         .subscribe(function(datatoreturn) {             console.log(datatoreturn);         }); 

maybe this:

var rx = require('rx'); var rxnode = require('rx-node'); var fs = require('fs'); var split = require('split');  rxnode.fromreadablestream(process.stdin).map(      inputdata => inputdata.tostring().trim() ).flatmap(inputword => {     var checkstream = fs.createreadstream('./dict.txt');     return rxnode.fromreadablestream(         checkstream.pipe(split())     ).map(         linefromfile => json.parse(linefromfile)     ).find(         parseddatatocheck => parseddatatocheck.word.tolowercase().trim() === inputword     ); }).subscribe(datatoreturn => {     console.log(datatoreturn); }); 

note possible input words stdin reordered after filtering due asynchronous fs reads.