Este pseudocódigo:
getMongoCollectionStream().fromMongoDocumentToCouchbaseDocuments().writeCBdocToCouchbase()
Se traduce en:
MongoClient client = MongoClients.create(connectionString);
MongoDatabase db = client.getDatabase(dbName);
Observable<org.bson.Document> mongoDocs = db.getCollection(collectionName).find();
db.getCollection(collectionName).find().toObservable()
.map(new Func1<org.bson.Document, Document>() {
public Document call(org.bson.Document mongoDoc) {
mongoDoc.put(typeField, type);
RawJsonDocument d = RawJsonDocument.create(mongoDoc
.getObjectId("_id").toHexString(), mongoDoc
.toJson());
return d;
};
}).map(asyncBucket.upsert(doc));;

Deja un comentario