nohup bashスクリプトのnodejsサブプロセスロック

nohup bashスクリプトのnodejsサブプロセスロック

私はnohupを使って何百から何千ものNodejsコマンドのリストを含むスクリプトを実行します。これらのnodejsサブプロセスは、mysqlとSalesforceのデータをCouchdbに同期します。

$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh

スクリプト:

#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....

端末で子プロセスの実行が停止することを確認しました。

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ kill 6333

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6423 ?        00:00:00 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6449 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

メモ:30097はプロセスのpidですnohup

子プロセスを終了する前と終了した後のログを確認すると、次のnodejsコマンドが順次実行されることがわかります。詳細な出力フラグを使用して実行してみましたが、--debug例外は見つかりませんでした。

補足説明

  • Nodejsのメモリ制限は1GBです。
  • Couchdbのデフォルトの最大接続数は2048です。
  • コンテンツmf-sync.js

    #!/usr/bin/env node
    process.title = 'mf-sync';
    
    var path = require('path')
    ,   fs = require('fs')
    ,   _ = require('underscore');
    
    // Parse command-line arguments
    var args = _.chain(process.argv).rest(2).map(function(arg) {
        arg = arg.replace('--', '').split('=');
        _.size(arg) === 1 && arg.push(true);
        return arg;
    }).object().value();
    
    if (!args.mfi) throw new Error('MFI ID not specified');
    if (!args.source) throw new Error('Source ID not specified');
    
    // Output when using `--debug` flag
    var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); };
    
    // Simulation mode
    var simulate = _.has(args, 'simulate');
    
    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started');
    simulate && console.warn('Simulation mode enabled. No changes will occurr.');
    debug(args);
    
    // Load MySQL configuration
    var my = require('mysql');
    var myConfig = require(path.join(__dirname, 'mysql.json'));
    var db = 'gold';
    if (args.source == '101027') db = 'mfdb';
    var mysql = my.createConnection(myConfig[db]);
    debug('MySQL', myConfig[db].database);
    
    // Load Salesforce configuration
    var sf = require('node-salesforce');
    var sfConfig = require(path.join(__dirname, 'salesforce.json'));
    var salesforce = new sf.Connection(sfConfig);
    debug('Salesforce', sfConfig.username);
    
    // Load CouchDB configuration
    var cradle = require('cradle');
    var couchConfig = require(path.join(__dirname, 'couchdb.json'));
    var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name);
    debug('CouchDB', couchConfig.mfdb.name);
    
    // Add missing function to Underscore.js
    _.mixin({
        compactObject: function(obj) {
            _.each(obj, function(v, k) {
                if (_.isNull(v) || _.isFunction(v)) delete obj[k];
            });
            return obj;
        }
    });
    
    // Get MFI data from MySQL
    // -----------------------
    var getMySQLData = function(mfi, callback) {
        mysql.connect();
    
        // Get master MFI metadata
        debug('Getting master MFI metadata from `mfi`.');
        mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
            if (err) throw new Error(err);
            _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value());
        });
    
        // Define MFDB data tables
        var tables = {
            'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'],
            'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'],
            'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'],
            'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance']
        };
        // Remove table name variance
        var baseTable = _.memoize(function(table) {
            return table.replace('_usd', '').replace('_adjusted', '');
        });
    
        var docs = {};
        // Get all available MFDB data for the current `mfi_vid`
        debug('Getting all available MFDB data for the current `mfi_vid`.');
        _.each(_.keys(tables), function(key) {
            _.each(tables[key], function(table) {
                debug('Querying', key, 'data from', table);
                mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                    if (err) throw new Error(err);
    
                    // Create full document data
                    _.each(rows, function(row) {
                        // Create doc._id
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        debug('Processing', table, 'data for', doc_id);
    
                        // Initialize document
                        if (!docs[doc_id]) docs[doc_id] = {
                            _id: doc_id,
                            type: 'mfi-period',
                            currency: key.split('/')[0],
                            adjusted: key.split('/')[1] === 'true',
                            fiscal_year: row.fiscal_year,
                            period_type: row.period_type,
                            as_of_date: row.as_of_date
                        };
                        if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code;
    
                        // Extend MFDB data into document
                        debug('Adding', table, 'data to', doc_id);
                        row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value();
                        if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row;
                    });
                });
            });
        });
    
        // Get all scenario data to create dimension hierarchy
        var tree = {};
        mysql.query("SELECT * FROM scenarios", function(err, rows) {
            debug('Processing scenario data into hierarchical tree.');
            if (err) throw new Error(err);
    
            // Get all children scenarios for any given parent
            var getChildren = function(parent) {
                var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) {
                    // Remove used scenarios from master list to decrease stack size
                    _.each(_.keys(scenarios), function(scenario) {
                        rows = _.without(rows, _.findWhere(rows, {scenarios: scenario}));
                    });
                }).value();
                if (_.isEmpty(children)) return null;
                return children;
            }
    
            // Recursively get dimension hierarchy
            var getTree = function(hierarchy) {
                if (_.isEmpty(hierarchy)) return;
                _.each(_.keys(hierarchy), function(p) {
                    hierarchy[p] = getChildren(p);
                    if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]);
                });
            }
    
            tree = getChildren('');
            getTree(tree);
        });
    
        // Find path to nested object property
        var findPath = _.memoize(function(needles, haystack) {
            function constructPath(haystack, needle, path) {
                if (!_.isObject(haystack)) return false;
                if (typeof haystack !== 'object') return false;
                for (var key in haystack) {
                    var value = haystack[key];
                    var currentPath = _.extend([], path);
                    currentPath.push(key);
                    if (key === needle) return currentPath;
                    var foundPath = constructPath(value, needle, currentPath);
                    if (foundPath) return foundPath;
                }
            }
            // Handle comma-separated nested hierarchies
            return _.chain(needles.split(',')).map(function(needle) {
                return constructPath(haystack, needle, []);
            }).flatten().compact().value();
        });
        // Assign value inside a nested object property
        var deepAssign = function(obj, path, val) {
            for (var i = 0 in path) {
                var key = path[i];
                if (i == path.length - 1) {
                    if (typeof obj[key] === 'object') obj[key].value = val;
                    else obj[key] = val;
                } else if (typeof obj[key] !== 'object') {
                    obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]};
                }
                obj = obj[key];
            }
        }
        // Sanitize dimension names
        var sanitizeDimensions = _.memoize(function(dimensions) {
            return _.map(dimensions, function(dimension) {
                dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, '');
                if (/:/.test(dimension)) return dimension.split(':')[1];
                else return dimension;
            });
        });
    
        // Get dimension data for all available documents
        _.each(['usd', 'local'], function(currency) {
            var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions';
            debug('Querying', currency, 'data from', dimensions_table);
            mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                debug('Processing all data from', dimensions_table);
                if (err) throw new Error(err);
                _.each(rows, function(row) {
                    var dimension_path = findPath(row.scenarios, tree);
                    if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios);
                    _.each(['true', 'false'], function(adjusted) {
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path));
                        docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value));
                    });
                });
            });
        });
    
        mysql.end(function(err) {
            debug('Disconnected from MySQL', db);
            if (err) throw new Error(err);
            callback(mfi, docs);
        });
    }
    
    // Get MFI metadata from Salesforce
    // --------------------------------
    var getSalesforceData = function(mfi, docs, callback) {
        var remaining = 4;
        var done = function(mfi, docs) {
            if (--remaining === 0) {
                callback(mfi, docs);
    
                // Logout from Salesforce
                salesforce.logout(function(err) {
                    debug('Logged out from Salesforce');
                    if (err) throw new Error(err);
                });
            }
        }
    
        // Login into Salesforce
        debug('Login into Salesforce');
        salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) {
            if (err) throw new Error(err);
    
            // Get main MFI Metadata
            debug('Getting MFI metadata from Salesforce');
            salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) {
                if (err) throw new Error(err);
                if (result.totalSize === 0) throw new Error('MFI does not exist');
                var record = {};
                _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) {
                    // Make attributes lowercase
                    record[k.toLowerCase()] = v;
                });
                _.extend(mfi, record);
                mfi.mfi_name = mfi.name;
                done(mfi, docs);
            });
    
            // Determine whether MFI contains Social Performance Profile data
            debug('Determining whether MFI contains SP Profile data.');
            salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) {
                if (err) throw new Error(err);
                mfi.sp_profile = !_.isEmpty(result.records);
                done(mfi, docs);
            });
    
            // Get list of MFI Network Affiliations
            debug('Getting list of MFI Network Affiliations');
            salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value();
                done(mfi, docs);
            });
    
            // Get annual diamonds
            debug('Getting annual diamonds.');
            salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                // Group diamonds by year
                var diamonds = _.chain(result.records).map(function(period) {
                    return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value();
                }).object().value();
                // Add diamonds to corresponding periods
                _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) {
                    doc.annual_diamonds = diamonds[doc.fiscal_year];
                });
                done(mfi, docs);
            });
        });
    }
    
    // Calculate Peer Group data
    // -------------------------
    var calculatePeerGroupData = function(docs, callback) {
        // Safely get data point value
        var getVal = function(obj, group, prop) {
            if (_.has(obj, group) && _.has(obj[group], prop)) {
                return obj[group][prop].value || obj[group][prop];
            }
            return undefined;
        }
    
        _.each(docs, function(doc, id) {
            var peer_groups = {};
    
            // Age
            debug('Calculating peer group age for', doc._id);
            if (_.has(doc, 'date_established__c')) {
                var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199);
                if (age) {
                    if (age < 4) peer_groups['age'] = 'New';
                    else if (age <= 8) peer_groups['age'] = 'Young';
                    else if (age > 8) peer_groups['age'] = 'Mature';
                }
            }
    
            // Intermediation
            debug('Calculating peer group intermediation for', doc._id);
            var deposits = getVal(doc, 'balance_sheet', 'deposits');
            var total_assets = getVal(doc, 'balance_sheet', 'total_assets');
            if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) {
                var ratio = deposits / total_assets;
                if (ratio === 0) peer_groups['intermediation'] = 'Non FI';
                else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI';
                else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI';
            }
            else if (total_assets === 0) {
                peer_groups['intermediation'] = 'Non FI';
            }
    
            // Market
            debug('Calculating peer group market for', doc._id);
            var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita');
            var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance');
            if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) {
                if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End';
                else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad';
                else if ((depth >= 1.5)  && (depth < 2.5)) peer_groups['market'] = 'High End';
                else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business';
            }
    
            // Outreach
            debug('Calculating peer group outreach for', doc._id);
            var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers');
            if (total_borrowers < 10000) peer_groups['outreach'] = 'Small';
            else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium';
            else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large';
    
            // Scale
            debug('Calculating peer group scale for', doc._id);
            if (_.has(doc, 'mix_region__c')) {
                var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio');
                if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small';
                else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium';
                else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large';
            }
    
            // Sustainability
            debug('Calculating peer group sustainability for', doc._id);
            var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency');
            if (!_.isUndefined(operational_self_sufficiency)) {
                if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS';
                else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS';
            }
    
            if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups;
        });
    
        callback(docs);
    }
    
    // Send data to CouchDB
    // --------------------
    var updateCouchDB = function(docs, callback) {
        // Update master MFI record
        debug('Updating master MFI record');
        var mfi = docs.shift();
        couch.get(mfi._id, function(err, doc) {
            if (err) {
                if (err.error === 'not_found') {
                    require('util').log('Inserting ' + mfi._id);
                    !simulate && couch.save(mfi._id, mfi, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                } else throw new Error(err);
            } else if (doc._rev) {
                require('util').log('Updating ' + mfi._id);
                !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) {
                    debug('Updated', res);
                    if (err) throw new Error(err);
                });
            }
        });
    
        // Get list of existing IDs in CouchDB
        debug('Getting list of existing IDs in CouchDB');
        couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) {
            if (err) throw new Error(err);
    
            // Remove outdated documents from CouchDB
            _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) {
                return _.findWhere(ids, {id: id});
            }).each(function(doc) {
                require('util').log('Removing ' + doc.id);
                couch.remove(doc.id, doc.value.rev, function(err, res) {
                    debug('Removed', res);
                    if (err) throw new Error(err);
                });
            });
    
            // Insert/update all documents for this MFI
            _.each(docs, function(doc) {
                var update = _.findWhere(ids, {id: doc._id});
                if (update) {
                    require('util').log('Updating ' + doc._id);
                    !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) {
                        debug('Updated', res);
                        if (err) throw new Error(err);
                    });
                } else {
                    require('util').log('Inserting ' + doc._id);
                    !simulate && couch.save(doc._id, doc, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                }
            });
    
            callback();
        });
    }
    
    // Initialize MFI document
    var mfi = {
        _id: 'mfi/' + args.source + '/' + args.mfi,
        type: 'mfi',
        source_id: args.source,
        mfi_id: args.mfi,
        updated: new Date()
    };
    
    getMySQLData(mfi, function(mfi, docs) {
        getSalesforceData(mfi, docs, function(mfi, docs) {
            // Merge MFI metadata into each period
            _.each(docs, function(doc, id) {
                docs[id] = _.extend(_.clone(mfi), doc);
            });
            calculatePeerGroupData(docs, function(docs) {
                // Convert to array for bulk updating
                docs = _.union([mfi], _.values(docs));
                updateCouchDB(docs, function() {
                    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished');
                });
            });
        });
    });
    

質問

私は知りたいです:

  1. これらのサブプロセスが停止するのはなぜですか? (凍結された項目が実行され、停止された項目と異なるという証拠が見つかりません。)
  2. 手動で終了する必要がないように、数分間停止するサブプロセスを停止するスクリプトをどのように作成できますか?

答え1

一部のリソースが枯渇していると推測する必要があります(既に推測している可能性があります)。たぶん、最大オープンファイル、mysql、またはSalesforceです。全く知らない。

ただし、この問題を解決する1つの方法は、mf-syncをモジュールに配置し、多くのmf-syncエントリを使用するのではなく、キューを使用してこのmf-syncエントリを実行する制御ノードスクリプトを使用することです。スクリプトは、キューの制御された配置と少し似ています。次のことを試してくださいhttps://github.com/learnboost/kue

これを行う方法が少し狂ったようです。ただし、ファイルが十分でない場合は、制限を増やしても問題がない可能性があります。 http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/

あるいは、実際にこれをシェルで実行するには、batchコマンドを使用してキューを処理できます。 http://pubs.opengroup.org/onlinepubs/009695399/utilities/batch.html

それとも、これはもっと良いかもしれません。http://pebblesinthesand.wordpress.com/2008/05/22/a-srcipt-for-running-processes-in-parallel-in-bash/

関連情報