私はnohupを使って何百から何千ものNodejsコマンドのリストを含むスクリプトを実行します。これらのnodejsサブプロセスは、mysqlとSalesforceのデータをCouchdbに同期します。
$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh
スクリプト:
#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....
端末で子プロセスの実行が停止することを確認しました。
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6333 ? 00:00:01 mf-sync
30097 ? 00:00:00 mf-sync.staging
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6333 ? 00:00:01 mf-sync
30097 ? 00:00:00 mf-sync.staging
[rgoya@host ~]$ kill 6333
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6423 ? 00:00:00 mf-sync
30097 ? 00:00:00 mf-sync.staging
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6449 ? 00:00:01 mf-sync
30097 ? 00:00:00 mf-sync.staging
メモ:30097はプロセスのpidですnohup
。
子プロセスを終了する前と終了した後のログを確認すると、次のnodejs
コマンドが順次実行されることがわかります。詳細な出力フラグを使用して実行してみましたが、--debug
例外は見つかりませんでした。
補足説明
- Nodejsのメモリ制限は1GBです。
- Couchdbのデフォルトの最大接続数は2048です。
コンテンツ
mf-sync.js
。#!/usr/bin/env node process.title = 'mf-sync'; var path = require('path') , fs = require('fs') , _ = require('underscore'); // Parse command-line arguments var args = _.chain(process.argv).rest(2).map(function(arg) { arg = arg.replace('--', '').split('='); _.size(arg) === 1 && arg.push(true); return arg; }).object().value(); if (!args.mfi) throw new Error('MFI ID not specified'); if (!args.source) throw new Error('Source ID not specified'); // Output when using `--debug` flag var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); }; // Simulation mode var simulate = _.has(args, 'simulate'); require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started'); simulate && console.warn('Simulation mode enabled. No changes will occurr.'); debug(args); // Load MySQL configuration var my = require('mysql'); var myConfig = require(path.join(__dirname, 'mysql.json')); var db = 'gold'; if (args.source == '101027') db = 'mfdb'; var mysql = my.createConnection(myConfig[db]); debug('MySQL', myConfig[db].database); // Load Salesforce configuration var sf = require('node-salesforce'); var sfConfig = require(path.join(__dirname, 'salesforce.json')); var salesforce = new sf.Connection(sfConfig); debug('Salesforce', sfConfig.username); // Load CouchDB configuration var cradle = require('cradle'); var couchConfig = require(path.join(__dirname, 'couchdb.json')); var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name); debug('CouchDB', couchConfig.mfdb.name); // Add missing function to Underscore.js _.mixin({ compactObject: function(obj) { _.each(obj, function(v, k) { if (_.isNull(v) || _.isFunction(v)) delete obj[k]; }); return obj; } }); // Get MFI data from MySQL // ----------------------- var getMySQLData = function(mfi, callback) { mysql.connect(); // Get master MFI metadata debug('Getting master MFI metadata from `mfi`.'); mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) { if (err) throw new Error(err); _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value()); }); // Define MFDB data tables var tables = { 'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'], 'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'], 'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'], 'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance'] }; // Remove table name variance var baseTable = _.memoize(function(table) { return table.replace('_usd', '').replace('_adjusted', ''); }); var docs = {}; // Get all available MFDB data for the current `mfi_vid` debug('Getting all available MFDB data for the current `mfi_vid`.'); _.each(_.keys(tables), function(key) { _.each(tables[key], function(table) { debug('Querying', key, 'data from', table); mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) { if (err) throw new Error(err); // Create full document data _.each(rows, function(row) { // Create doc._id var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/'); debug('Processing', table, 'data for', doc_id); // Initialize document if (!docs[doc_id]) docs[doc_id] = { _id: doc_id, type: 'mfi-period', currency: key.split('/')[0], adjusted: key.split('/')[1] === 'true', fiscal_year: row.fiscal_year, period_type: row.period_type, as_of_date: row.as_of_date }; if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code; // Extend MFDB data into document debug('Adding', table, 'data to', doc_id); row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value(); if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row; }); }); }); }); // Get all scenario data to create dimension hierarchy var tree = {}; mysql.query("SELECT * FROM scenarios", function(err, rows) { debug('Processing scenario data into hierarchical tree.'); if (err) throw new Error(err); // Get all children scenarios for any given parent var getChildren = function(parent) { var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) { // Remove used scenarios from master list to decrease stack size _.each(_.keys(scenarios), function(scenario) { rows = _.without(rows, _.findWhere(rows, {scenarios: scenario})); }); }).value(); if (_.isEmpty(children)) return null; return children; } // Recursively get dimension hierarchy var getTree = function(hierarchy) { if (_.isEmpty(hierarchy)) return; _.each(_.keys(hierarchy), function(p) { hierarchy[p] = getChildren(p); if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]); }); } tree = getChildren(''); getTree(tree); }); // Find path to nested object property var findPath = _.memoize(function(needles, haystack) { function constructPath(haystack, needle, path) { if (!_.isObject(haystack)) return false; if (typeof haystack !== 'object') return false; for (var key in haystack) { var value = haystack[key]; var currentPath = _.extend([], path); currentPath.push(key); if (key === needle) return currentPath; var foundPath = constructPath(value, needle, currentPath); if (foundPath) return foundPath; } } // Handle comma-separated nested hierarchies return _.chain(needles.split(',')).map(function(needle) { return constructPath(haystack, needle, []); }).flatten().compact().value(); }); // Assign value inside a nested object property var deepAssign = function(obj, path, val) { for (var i = 0 in path) { var key = path[i]; if (i == path.length - 1) { if (typeof obj[key] === 'object') obj[key].value = val; else obj[key] = val; } else if (typeof obj[key] !== 'object') { obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]}; } obj = obj[key]; } } // Sanitize dimension names var sanitizeDimensions = _.memoize(function(dimensions) { return _.map(dimensions, function(dimension) { dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, ''); if (/:/.test(dimension)) return dimension.split(':')[1]; else return dimension; }); }); // Get dimension data for all available documents _.each(['usd', 'local'], function(currency) { var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions'; debug('Querying', currency, 'data from', dimensions_table); mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) { debug('Processing all data from', dimensions_table); if (err) throw new Error(err); _.each(rows, function(row) { var dimension_path = findPath(row.scenarios, tree); if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios); _.each(['true', 'false'], function(adjusted) { var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/'); var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path)); docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value)); }); }); }); }); mysql.end(function(err) { debug('Disconnected from MySQL', db); if (err) throw new Error(err); callback(mfi, docs); }); } // Get MFI metadata from Salesforce // -------------------------------- var getSalesforceData = function(mfi, docs, callback) { var remaining = 4; var done = function(mfi, docs) { if (--remaining === 0) { callback(mfi, docs); // Logout from Salesforce salesforce.logout(function(err) { debug('Logged out from Salesforce'); if (err) throw new Error(err); }); } } // Login into Salesforce debug('Login into Salesforce'); salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) { if (err) throw new Error(err); // Get main MFI Metadata debug('Getting MFI metadata from Salesforce'); salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) { if (err) throw new Error(err); if (result.totalSize === 0) throw new Error('MFI does not exist'); var record = {}; _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) { // Make attributes lowercase record[k.toLowerCase()] = v; }); _.extend(mfi, record); mfi.mfi_name = mfi.name; done(mfi, docs); }); // Determine whether MFI contains Social Performance Profile data debug('Determining whether MFI contains SP Profile data.'); salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) { if (err) throw new Error(err); mfi.sp_profile = !_.isEmpty(result.records); done(mfi, docs); }); // Get list of MFI Network Affiliations debug('Getting list of MFI Network Affiliations'); salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) { if (err) throw new Error(err); mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value(); done(mfi, docs); }); // Get annual diamonds debug('Getting annual diamonds.'); salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) { if (err) throw new Error(err); // Group diamonds by year var diamonds = _.chain(result.records).map(function(period) { return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value(); }).object().value(); // Add diamonds to corresponding periods _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) { doc.annual_diamonds = diamonds[doc.fiscal_year]; }); done(mfi, docs); }); }); } // Calculate Peer Group data // ------------------------- var calculatePeerGroupData = function(docs, callback) { // Safely get data point value var getVal = function(obj, group, prop) { if (_.has(obj, group) && _.has(obj[group], prop)) { return obj[group][prop].value || obj[group][prop]; } return undefined; } _.each(docs, function(doc, id) { var peer_groups = {}; // Age debug('Calculating peer group age for', doc._id); if (_.has(doc, 'date_established__c')) { var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199); if (age) { if (age < 4) peer_groups['age'] = 'New'; else if (age <= 8) peer_groups['age'] = 'Young'; else if (age > 8) peer_groups['age'] = 'Mature'; } } // Intermediation debug('Calculating peer group intermediation for', doc._id); var deposits = getVal(doc, 'balance_sheet', 'deposits'); var total_assets = getVal(doc, 'balance_sheet', 'total_assets'); if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) { var ratio = deposits / total_assets; if (ratio === 0) peer_groups['intermediation'] = 'Non FI'; else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI'; else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI'; } else if (total_assets === 0) { peer_groups['intermediation'] = 'Non FI'; } // Market debug('Calculating peer group market for', doc._id); var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita'); var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance'); if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) { if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End'; else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad'; else if ((depth >= 1.5) && (depth < 2.5)) peer_groups['market'] = 'High End'; else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business'; } // Outreach debug('Calculating peer group outreach for', doc._id); var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers'); if (total_borrowers < 10000) peer_groups['outreach'] = 'Small'; else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium'; else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large'; // Scale debug('Calculating peer group scale for', doc._id); if (_.has(doc, 'mix_region__c')) { var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio'); if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small'; else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium'; else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large'; } // Sustainability debug('Calculating peer group sustainability for', doc._id); var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency'); if (!_.isUndefined(operational_self_sufficiency)) { if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS'; else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS'; } if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups; }); callback(docs); } // Send data to CouchDB // -------------------- var updateCouchDB = function(docs, callback) { // Update master MFI record debug('Updating master MFI record'); var mfi = docs.shift(); couch.get(mfi._id, function(err, doc) { if (err) { if (err.error === 'not_found') { require('util').log('Inserting ' + mfi._id); !simulate && couch.save(mfi._id, mfi, function(err, res) { debug('Inserted', res); if (err) throw new Error(err); }); } else throw new Error(err); } else if (doc._rev) { require('util').log('Updating ' + mfi._id); !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) { debug('Updated', res); if (err) throw new Error(err); }); } }); // Get list of existing IDs in CouchDB debug('Getting list of existing IDs in CouchDB'); couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) { if (err) throw new Error(err); // Remove outdated documents from CouchDB _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) { return _.findWhere(ids, {id: id}); }).each(function(doc) { require('util').log('Removing ' + doc.id); couch.remove(doc.id, doc.value.rev, function(err, res) { debug('Removed', res); if (err) throw new Error(err); }); }); // Insert/update all documents for this MFI _.each(docs, function(doc) { var update = _.findWhere(ids, {id: doc._id}); if (update) { require('util').log('Updating ' + doc._id); !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) { debug('Updated', res); if (err) throw new Error(err); }); } else { require('util').log('Inserting ' + doc._id); !simulate && couch.save(doc._id, doc, function(err, res) { debug('Inserted', res); if (err) throw new Error(err); }); } }); callback(); }); } // Initialize MFI document var mfi = { _id: 'mfi/' + args.source + '/' + args.mfi, type: 'mfi', source_id: args.source, mfi_id: args.mfi, updated: new Date() }; getMySQLData(mfi, function(mfi, docs) { getSalesforceData(mfi, docs, function(mfi, docs) { // Merge MFI metadata into each period _.each(docs, function(doc, id) { docs[id] = _.extend(_.clone(mfi), doc); }); calculatePeerGroupData(docs, function(docs) { // Convert to array for bulk updating docs = _.union([mfi], _.values(docs)); updateCouchDB(docs, function() { require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished'); }); }); }); });
質問
私は知りたいです:
- これらのサブプロセスが停止するのはなぜですか? (凍結された項目が実行され、停止された項目と異なるという証拠が見つかりません。)
- 手動で終了する必要がないように、数分間停止するサブプロセスを停止するスクリプトをどのように作成できますか?
答え1
一部のリソースが枯渇していると推測する必要があります(既に推測している可能性があります)。たぶん、最大オープンファイル、mysql、またはSalesforceです。全く知らない。
ただし、この問題を解決する1つの方法は、mf-syncをモジュールに配置し、多くのmf-syncエントリを使用するのではなく、キューを使用してこのmf-syncエントリを実行する制御ノードスクリプトを使用することです。スクリプトは、キューの制御された配置と少し似ています。次のことを試してくださいhttps://github.com/learnboost/kue
これを行う方法が少し狂ったようです。ただし、ファイルが十分でない場合は、制限を増やしても問題がない可能性があります。 http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
あるいは、実際にこれをシェルで実行するには、batch
コマンドを使用してキューを処理できます。 http://pubs.opengroup.org/onlinepubs/009695399/utilities/batch.html
それとも、これはもっと良いかもしれません。http://pebblesinthesand.wordpress.com/2008/05/22/a-srcipt-for-running-processes-in-parallel-in-bash/