Skip to content
Snippets Groups Projects
Commit b86d30d5 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

COB-49: this cexec replacement seems not to be used anywhere.... making it obsolete... removing it.

parent cf9c7899
No related branches found
No related tags found
1 merge request!6Import cobalt2 into lofar4
......@@ -5561,7 +5561,6 @@ SubSystems/Online_Cobalt/validation/cobalt/network/interface_arp_ignore_setting.
SubSystems/Online_Cobalt/validation/cobalt/network/interface_links.test -text
SubSystems/Online_Cobalt/validation/cobalt/network/interface_mtu_settings.test -text
SubSystems/Online_Cobalt/validation/cobalt/network/resolvconf.test eol=lf
SubSystems/Online_Cobalt/validation/intercluster/c3/cexec -text
SubSystems/Online_Cobalt/validation/intercluster/connectivity/cobalt2cep.test eol=lf
SubSystems/Online_Cobalt/validation/intercluster/connectivity/cobalt2cobalt.test eol=lf
SubSystems/Online_Cobalt/validation/intercluster/ethernet/iperf-cobalt2locus.bw-req -text
......
# $Id$
import re, sys
from c3_except import *
# class used to represent cluster taken from command line
# clusters is the list of cluster names used as a key in the
# nodes associative array. To iterate through them would look as such:
# for cname in c3_cluster_list.clusters:
# for node in c3_cluster_list[cname]:
# print node
class c3_cluster_list:
def __init__(self):
self.clusters = []
self.node = {}
self.username = {}
class c3_command_line:
# matches any non whitespace word
any_token = re.compile( r"\s*(?P<word>\S+)" )
# matches a - or -- and then the option name
option = re.compile( r"\s*(?P<word>--?\w+)" )
# matches a cluster name
c_name = re.compile( r"\s+(?P<username>[\w_\-]+@)?(?P<name>[\w_\-]+)?:" );
# matches a single number
number = re.compile( r"\s*(?P<num>\d+)" );
# matches a range quaifier "-"
range_qual = re.compile( r"\s*-" );
# matches a single node qualifier ","
single = re.compile( r"\s*," );
# initiliaze line as an enpty string
line = ""
# initilaize internal line as the text to be parsed
def __init__( self, command_line ):
self.line = command_line
# returns a single option, a - or -- followed by string
def get_opt( self ):
match = self.option.match( self.line )
if match:
line_out = ""
line_out = match.group( "word" )
self.line = self.line[match.end():]
else:
raise end_of_option( None, None )
return line_out
# returns a single token, used for cases such as --file FILENAME to get FILENAME
def get_opt_string( self ):
match = self.any_token.match( self.line )
if match:
line_out = ""
line_out = match.group( "word" )
self.line = self.line[match.end():]
elif len( self.line ) == 0:
raise end_of_opt_string( "option needs a string", None)
else:
raise bad_string( "option requires a string", None )
return line_out
# all parsing is done externally through this command
def get_clusters( self ):
cluster_obj = c3_cluster_list()
# check if any clusters are specified on the command line, if not
# then set execution to default cluster (name not known at this point
# /default is an invalid cluster name and hence a place holder in this
# context
match = self.c_name.match( self.line )
if not match:
cluster_obj.clusters.append( "/default" )
cluster_obj.node["/default"]=[]
cluster_obj.node["/default"].append( "" )
cluster_obj.username["/default"]="/default"
# while there are still cluster blocks on the command line
while match:
# string parsed text from line
self.line = self.line[match.end():]
# if a name is specified get it, else : is specified set to default cluster
if match.group( "name" ):
index = match.group( "name" )
else:
index = "/default"
#if an alternate username is specified use it.
if match.group( "username" ):
cluster_obj.username[index]=match.group( "username" )[:-1]
else:
cluster_obj.username[index]="/default"
# add name to cluster list and initialize node list
cluster_obj.clusters.append( index )
match = self.number.match( self.line )
cluster_obj.node[index] = []
cluster_obj.node[index].append( "" )
node_index = 0
# if a range has been specified on command line parse it
# this process gets the first number from the list and stores it in a temp
# var. Then checks if it is part of a range or a single number. If it is a range
# it processes once and then checks for a single number, if it is a single number
# it processes single numbers until wither the end of the node position specification
# or a range qualifier is found, if a range is found the process loops. In this way the
# would be parsed correctly: "1,3,4-8,10-20,25"
while match:
# strip parsed text from line
self.line = self.line[match.end():]
# get starting number from match
cluster_obj.node[index][node_index] = int( match.group( "num") )
# if a match is specified parse it
match = self.range_qual.match( self.line )
if match:
# strip parsed text from line
self.line = self.line[match.end():]
match = self.number.match( self.line )
# set start and end of range
start_range = cluster_obj.node[index][node_index] + 1
if match:
end_range = int( match.group( "num" ) ) + 1
else:
self.line = '-' + self.line
return cluster_obj
# loop from start to end and add node position to node list
for counter in range( start_range, end_range ):
node_index = node_index + 1
cluster_obj.node[index].append( "" )
cluster_obj.node[index][node_index] = counter
# strip parsed text from line
self.line = self.line[match.end():]
# check for single numbers (2,5,6)
match = self.single.match( self.line )
if match:
# srip parsed text from line
self.line = self.line[match.end():]
# add node poistion to node list
cluster_obj.node[index].append( "" )
node_index = node_index + 1
match = self.number.match( self.line )
match = self.c_name.match( self.line )
try:
if self.line[0] != ' ':
raise bad_cluster_name ( None, None )
except IndexError:
pass
return cluster_obj
# returns the rest of line (usually for the end of the command)
def rest_of_command( self ):
return self.line
# vim:tabstop=4:shiftwidth=4:noexpandtab:textwidth=76
# $Id$
import os
import os.path
import c3_version
#This file holds all the variables used by the other utilities
C3_OKEXIT = 0
C3_ERRLOCAL = 1
C3_ERRREMOTE = 2
def_path = ''
try:
def_path = os.environ[ 'C3_PATH' ]
except KeyError:
if os.path.isfile('/usr/bin/cexec') and os.access('/usr/bin/cexec', os.X_OK):
def_path = '/usr/bin'
else:
def_path = '/opt/c3-' + `c3_version.c3_version_major`
# vim:tabstop=4:shiftwidth=4:noexpandtab:textwidth=76
# $Id$
class c3_except:
description = ""
last = ""
def __init__(self, string, name):
self.description = string
self.last = name
class parse_error( c3_except ):
pass
class bad_cluster_name( parse_error ):
pass
class no_more_clusters( c3_except ):
pass
class invalid_head_node( parse_error ):
pass
class invalid_cluster_block( parse_error ):
pass
class no_head_node( c3_except ):
pass
class invalid_node( parse_error ):
pass
class end_of_cluster( c3_except ):
pass
class internel_error( c3_except ):
pass
class indirect_cluster( parse_error ):
pass
class not_in_range( parse_error ):
pass
class end_of_option( c3_except ):
pass
class bad_string( parse_error ):
pass
class end_of_opt_string( bad_string ):
pass
# vim:tabstop=4:shiftwidth=4:noexpandtab:textwidth=76
# $Id$
import sys, re
from c3_except import *
# this is the internal representation of a node
# as of right now it is just simply it's name
# (ip or alias) and it's status. dead = 0 is alive
# dead = 1 id offline
class node_obj:
name = ""
dead = 0
class cluster_def:
"parses the cluster definition file"
###################################################################
# this is the regular expressions used parse the file #
###################################################################
# beginning of cluster definition (name of the cluster)
# matches cluster_name {
cluster_name = re.compile ( r"""
\s*cluster\s+ #cluster keyword
(?P<c_name> #cluster name
[\w_\-]+ #may contain an alphanumeric, underscore, and dash
)\s*{[\t\v ]*(\#.*)?\n""", re.VERBOSE | re.IGNORECASE
)
# extracts the name of the head node
# matches external:internal
# with the internal name optional
head_node = re.compile ( r"""
\s* #get rid of whitespace
(?P<extname> #external head node name goes first
[\w\-\.]+
)?
(: #if internal name spcified
(?P<intname> #extract internal name
[\w\-\.]+
)
){0,1}[\t\v" "]*(\#.*)?\n""", re.VERBOSE
)
# extracts the name of compute nodes
# matches dead nodename
# with dead being optional and including
# ranges
compute_node = re.compile ( r"""
\s* #get rid of whitespace
(?P<dead_node>
dead[\t\v" "]+
)?
(?P<comname> #get name of current node
[\w\-.]+ #non range part of name
)
(?P<range>
\[(?P<start>\d+)\-(?P<stop>\d+)\] #get range (optional)
){0,1}[\t\v" "]*(\#.*)?\n""", re.VERBOSE
)
# exclude nodes from a range
# matches exclude [num1-num2]
# with num1 and num2 being integers
exclude = re.compile ( r"""
\s*exclude\s*
((?P<single>\s+\d+)|(\[(?P<start>\d+)\-(?P<stop>\d+)\]))
[\t\v" "]*(\#.*)?\n
""", re.VERBOSE | re.IGNORECASE
)
# matches brackets { }
start_bracket = re.compile( """\s*{""" )
end_bracket = re.compile ( """\s*}[\t\v" "]*""" )
# matches any non whitespace character
any_token = re.compile ("\s*\S+")
# matches a comment line
comment = re.compile( r"[ \t\r\f\v]*#.*\n" )
#########################################################
# variables needed for execution #
#########################################################
# filename of cluster config file
file = ""
# this is a string used to hold the config file
line = ""
# string to hole the current cluster name
c_name = ""
#strings to hold the head node names
head_int = ""
head_ext = ""
#list to hold ranges for nodes
node_list = []
#used to show place where error occured
last_cluster = "first cluster in list"
last_machine = "first node in list"
#########################################################
# #
# this is the constructor, it takes the filename of the #
# config file to parse, the second init throws an error #
# if no file name is given #
# #
#########################################################
def __init__(self, filename):
self.file = filename
inputfile = open( filename, "r" )
# generate a string containing the file
line_in = inputfile.readline()
while line_in:
self.line = self.line + line_in
line_in = inputfile.readline()
inputfile.close()
#########################################################
# resets the internal variables after an error #
#########################################################
def reset_vars(self):
self.line = ""
self.c_name = ""
self.head_int = ""
self.head_ext = ""
self.node_list = []
self.last_cluster = "first cluster in list"
self.last_machine = "first node in list"
#########################################################
# re-initializes the file (begins at the first cluster #
# again) #
#########################################################
def reread_file(self):
self.reset_vars()
self.__init__(self.file)
#########################################################
# strips comments from front of file #
#########################################################
def strip_comments( self ):
match = self.comment.match( self.line )
while match:
self.line = self.line[match.end():]
match = self.comment.match( self.line )
#########################################################
# scans to next cluster in the file, if called for the #
# first time goes to first cluster #
# doesn't return anything, just sets internal variable #
#########################################################
def get_next_cluster(self):
self.strip_comments()
match = None
try:
while not match: #loop untill a cluster tag is found
match = self.cluster_name.match( self.line )
if match: #if cluster tag found
# get cluster name
self.c_name = match.group( "c_name" )
self.line = self.line[match.end():]
self.strip_comments()
self.last_cluster = self.c_name
try:
match = self.head_node.match( self.line )
if not match.group( "extname" ):
# this indicates that it is an "indirect" cluster
# the internal node is actually the external link
# it was done this way because with normal operation
# this would be an impossible state
self.head_ext = None
self.head_int = match.group( "intname" )
else: # "direct" cluster
self.head_ext = match.group( "extname" )
self.head_int = match.group( "intname" )
if not self.head_int:
self.head_int = self.head_ext
except AttributeError: # parse error on the head node
name = self.c_name
self.reset_vars()
raise invalid_head_node( "invalid head node specification", name)
self.line = self.line[match.end():]
self.strip_comments()
else: # cluster tag not found
# strip a single token from self.line
match = self.any_token.match( self.line );
# an open bracket here would mean that a valid cluster tag was not found
# but a new cluster block was trying to be formed
if self.start_bracket.match( match.group() ):
name = self.last_cluster
self.reset_vars()
raise invalid_cluster_block( "invalid cluster definition", name)
self.line = self.line[match.end():]
self.strip_comments()
match=None
except AttributeError: # invalid cluster definition
name = self.last_cluster
self.reset_vars()
raise no_more_clusters( "No more valid cluster blocks", name )
#########################################################
# returns the external name of the current cluster #
# being parsed #
#########################################################
def get_external_head_node(self):
if self.head_ext == "":
raise no_head_node( "no head node set.", "no cluster read yet." )
return self.head_ext;
#########################################################
# returns the internal name of the current cluster #
# being parsed #
#########################################################
def get_internal_head_node(self):
if self.head_int == "":
raise no_head_node( "no head node set.", "no cluster read yet." )
return self.head_int;
#########################################################
# returns the name of the next node in the files if #
# called for the first time returns the first node name #
# returns a node_obj with the appropriate values filed #
# in #
#########################################################
def get_next_node(self):
self.strip_comments()
# the only time it is possible for this to occur is
# with indirect clusters
if self.head_ext == None:
name = "cluster " + self.c_name
raise indirect_cluster( "indirect clusters don't have nodes", name )
node_out = node_obj()
# when a range is specified a queue is built with the nodes
# so if a queue is present then we know that a range has
# been specified and must be used up before we parse another
# line in the file
if self.node_list:
node_out = self.node_list.pop(0)
self.last_machine = node_out.name
return node_out
match = self.compute_node.match( self.line )
if match: # if a compute node is found
self.line = self.line[match.end():]
self.strip_comments()
if match.group( "dead_node" ): # check if it is a dead node
if not match.group( "range" ): # dead node qualifier invalid with a range
node_out.name = match.group( "comname" )
node_out.dead = 1
else: # return the given node with a dead set to true
name = self.last_machine + " in " + self.c_name
self.reset_vars()
raise invalid_node( "dead specifier can not have a range", name )
else: # either a range or single node specified
if match.group( "range" ): # if range
# retrieve starting and stopping ranges
start_add_range = int( match.group( "start" ) )
stop_add_range = int( match.group( "stop" ) ) + 1
# start is always zero - start_add_range - start_add_range
# this is done so that the indexing starts at zero
start = 0
stop = stop_add_range - start_add_range
for index in range(start, stop): # populate list
self.node_list.append( node_obj() )
self.node_list[index].name = match.group( "comname" ) + str( index + start_add_range )
self.node_list[index].dead = 0
match = self.exclude.match( self.line )
# multiple exclude lines after a range are valid, hence the while loop
while match and self.node_list:
try:
if match.group( "single" ): # excluding a single machine
index = int( match.group( "single" ) )
if index < 0:
raise IndexError
self.node_list[index - start_add_range].dead = 1
else: # excluding a range
start_ex_range = int( match.group( "start" ) )
stop_ex_range = int( match.group( "stop" ) )
# list index starts at zero so the exclude index and
# list index must co-incide
start = start_ex_range - start_add_range
stop = (stop_ex_range - start_add_range) + 1
if (start < 0) or (start > stop):
raise IndexError
for index in range (start, stop):
self.node_list[index].dead = 1
self.line = self.line[match.end():]
self.strip_comments()
match = self.exclude.match( self.line) # check for second exclude
except IndexError:
name = self.last_machine + " in " + self.c_name
raise not_in_range( "index in exclude is not in range", name)
node_out = self.node_list.pop(0)
else: # single node specifier
node_out.name = match.group( "comname" )
node_out.dead = 0
else: # either there are no more nodes ( closing bracket is found )
# or there was a parse error on the node specification line
if self.end_bracket.match( self.any_token.match(self.line).group() ):
raise end_of_cluster( "no more nodes in config file", None )
name = self.last_machine + " in " + self.c_name
self.reset_vars()
raise invalid_node( "invalid specification for a node", name )
self.last_machine = node_out.name
return node_out
#########################################################
# returns the name of the current cluster being read #
#########################################################
def get_cluster_name(self):
if self.c_name == "":
raise no_cluster_name( "read in a cluster before useing", "no cluster read yet" )
return self.c_name
# vim:tabstop=4:shiftwidth=4:noexpandtab:textwidth=76
# $Id$
import os, sys, time
from select import select
from subprocess import *
# this class is used so that I can use strong authentication without
# much hassel and easily implement timeouts. As I am only sending a frame or
# two at a time it is not really noticable. This also allows an abstraction layer
# so that I can later change to raw socets/PVM/ or whatever without a major code
# re-write: in no way is this meant for speed :)
# one thing to note: these differ from real sockets API in one major way. since the server
# uses popen to initiate a connextion the clients stdin/stdout is mapped as the socket, thus
# once a client sock has been established all prints will also go to the server, thus you could
# really mess up communications by doing this - be carefull if you use them.
class c3_sock:
# I use these to abstract whether or not I am writing to stdout/stdin (client)
# or a pipe (server)
output_pipe = None
input_pipe = None
# default timeout of 10 seconds
timeout = 10
# self explanitory
def set_timeout( self, new_timeout ):
self.timeout = new_timeout
# general form if a messsage is length:message
# Simply pack the message and write to a pipe
def send( self, string_to_send ):
length = len( string_to_send )
string_to_send = str(length) + ':' + string_to_send
self.output_pipe.write( string_to_send )
self.output_pipe.flush()
# these sockets timeout on a receive (think of this as more along the
# lines of UDP instead of TCP). Thus if you have a client that will sit
# a while processing you need some form of stayalive message (I dont
# do anything that complicated in C3 so I didn't implement them)
def recieve( self ):
buffer = ""
# read a single character in until you get the size terminator
char_in = self.input_pipe.read( 1 )
time_start = time.time()
time_elapsed = 0
while char_in != ':':
buffer = buffer + char_in
char_in = self.input_pipe.read( 1 )
# as you can see timeouts work but are kinda cheesy, use a nonblocking
# read and increment a time counter if nothing is retuned, raise and exception
# if a threshold is crosed.
time_elapsed = time.time() - time_start
if char_in != "":
time_start = time.time()
if char_in == "" and time_elapsed > self.timeout:
raise 'time_out'
# try and read the message from the pipe in as large a chunk as possible, loop
# untill stated length is reached
length = int (buffer)
buffer = self.input_pipe.read( length )
time_start = time.time()
time_elapsed = 0
while len(buffer) < length:
new_buffer = buffer + self.input_pipe.read( length )
if len(new_buffer) > len(buffer):
time_start = time.time()
time_elapsed = 0
buffer = new_buffer
elif time_elapsed > self.timeout:
raise 'time_out'
else:
time_elapsed = time.time() - time_start
# return message received through pipe
return buffer
def close( self ):
self.__del__()
# server sock just defines two pipes (input, output) and initializes a command
# this this socket is a socket to a command. Think "ssh node10 client_code"
# where client code communicates through stdin and stdout.
# when it's done simply close pipes
class server_sock( c3_sock ):
def __init__(self, command):
#self.output_pipe, self.input_pipe = os.popen2( command )
# XXX: Changed to subprocess module (see ticket:16)
p = Popen(command, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True)
(self.output_pipe, self.input_pipe) = (p.stdin, p.stdout)
def __del__( self ):
self.output_pipe.close()
self.input_pipe.close()
# the client sock takes stdin/stdout and map them to another pipe, this is so
# the same send/receive function can be used. Since the socket uses only
# stdin/stdout to communicate only one socket can be running at a time per
# instance of the PROGRAM. stdout and stdin are set to None to catch this error.
# on socket closing stdin and stdout are set back to thier original values.
class client_sock( c3_sock ):
def __init__(self):
if sys.stdout == None and sys.stdin == None:
raise 'single client only'
self.output_pipe = sys.stdout
self.input_pipe = sys.stdin
sys.stdout = None
sys.stdin = None
def __del__( self ):
sys.stdin = self.input_pipe
sys.stdout = self.output_pipe
# vim:tabstop=4:shiftwidth=4:noexpandtab:textwidth=76
# This file has been automatically generated, do not modify
c3_version="5.1.3"
c3_version_major="5"
#!/usr/bin/env python
# $Id: cexec 209 2011-02-02 23:38:27Z tjn $
import c3_config
import c3_version
try:
import sys, os
from subprocess import *
sys.path.append( c3_config.def_path )
import c3_com_obj, c3_file_obj, socket, c3_sock, re
######## constants ####################################
help_info = """Usage: cexec [OPTIONS] [MACHINE DEFINTIONS] command
--help -h = Display help message.
--file -f = Alternate cluster configuration file. If one
is not supplied then /etc/c3.conf will be used.
-i = Interactive mode. Ask once before executing.
--head = Execute command on head node. Does not
execute on compute nodes if specified.
--pipe -p = Formats the output in a pipe friendly fashion.
--all = Execute command on all the nodes on all the
clusters in the configuration file, ignores
the [MACHINE_DEFINITONS] block.
--dryrun -n = Does not send commands to machines.
--version -v = Display the version number
Machine definitions are in the form of
clusterName: start-end, node number"""
backslash = re.compile( r"\\" )
#######################################################
######## check for arguments #########################
if len(sys.argv) == 1:
print help_info
sys.exit(c3_config.C3_OKEXIT)
#######################################################
######### et default options #########################
to_print = 0 #not used in parallel version
interactive = 0 #prompt before execution
head_node = 0 #execute only on head node
filename = "/etc/c3.conf" #default config file
all = 0 #only execute on specified clusters
pipe_output = 0
defusername = ""
options_to_pass = ""
pidlist = []
#######################################################
######### internal variables ##########################
cluster_from_file = {}
file_set = 0
dryrun = 0
returncode = 0
#######################################################
######### parse command line ##########################
# first create one large string of the command
command_line_list = sys.argv[1:]
command_line_string = ''
for item in command_line_list:
command_line_string = '%s %s' % (command_line_string, item)
# object used to parse command line
c3_command = c3_com_obj.c3_command_line( command_line_string )
# get first option
try:
option = c3_command.get_opt()
while option: # while more options
if (option == '-f') or (option == '--file'): # alternate config file
if not file_set:
filename = c3_command.get_opt_string()
file_set = 1
else:
print "only one file name can be specified."
sys.exit(c3_config.C3_ERRLOCAL)
elif option == '-h' or option == "--help": # print help info
print help_info
sys.exit(c3_config.C3_OKEXIT)
elif option == '-i': # ask once before executing command
interactive = 1
elif option == '--head': # execute only the head node
head_node = 1
elif option == "--all": #execute on all clusters in the list
all=1
elif option == '--pipe' or option == '-p':
pipe_output = 1
options_to_pass = options_to_pass + " --pipe"
elif option == '--dryrun' or option == '-n':
dryrun = 1
elif option == '--version' or option == '-v':
# Get C3's version
print c3_version.c3_version
sys.exit(c3_config.C3_OKEXIT)
else: # a catch all, option supplied not valid
print "option ", option, " is not valid"
sys.exit(c3_config.C3_ERRLOCAL)
option = c3_command.get_opt()
except c3_com_obj.end_of_option: #quit parsing options
pass
# create cluster object from command line
clusters = c3_com_obj.c3_cluster_list()
if all == 0:
clusters = c3_command.get_clusters()
else:
c3_command.get_clusters()
# get command to execute
command = c3_command.rest_of_command()
#######################################################
######### test if ssh or rsh ##########################
try:
transport = os.environ[ 'C3_RSH' ]
except KeyError:
transport = 'ssh'
#######################################################
######### set filename ##########################
if not file_set:
try:
filename = os.environ[ 'C3_CONF' ]
except KeyError:
filename = '/etc/c3.conf'
#######################################################
######### set default user name #######################
try:
defusername = os.environ[ 'C3_USER' ]
except KeyError:
defusername = os.environ[ 'USER' ]
######### make cluster list object from file ##########
########################################################
########################################################
# fixed to only read clusters specified
# and to set default cluster correctly
try: # open file & initialize file parser
file = c3_file_obj.cluster_def( filename )
except IOError:
print "error opening file: ", filename
sys.exit(c3_config.C3_ERRLOCAL)
######################################################
# there are two ways needed to biuld the cluster objects.
# one is a subset of available clusters and the other
# is to execute on all the clusters with the --all switch
# when building the clusters object It is not known which
# clusters are valid names so they must be populated while
# reading from the file
#######################################################
try:
file.get_next_cluster() # set the default cluster (first cluster in list)
try:
if clusters.clusters[0] == "/default":
clusters.clusters[0] = file.get_cluster_name()
clusters.node[file.get_cluster_name()] = clusters.node["/default"]
del clusters.node["/default"]
clusters.username[file.get_cluster_name()] = clusters.username["/default"]
except IndexError: #will be thrown if --all switch is used
pass
while(1): #throws exception when no more clusters
c_name = file.get_cluster_name() #name of cluster being parsed
if all == 1:
clusters.clusters.append( c_name )
clusters.node[c_name] = []
clusters.node[c_name].append( "" )
if c_name in clusters.clusters:
cluster_from_file[c_name] = {}
cluster_from_file[c_name]['external'] = file.get_external_head_node()
cluster_from_file[c_name]['internal'] = file.get_internal_head_node()
cluster_from_file[c_name]['nodes'] = [] #list of node names from file
if cluster_from_file[c_name]['external']: #if a direct cluster
index = 0
try:
while(1): # build list of nodes
node_obj = file.get_next_node()
cluster_from_file[c_name]['nodes'].append( c3_file_obj.node_obj() )
cluster_from_file[c_name]['nodes'][index] = node_obj
index = index + 1
except c3_file_obj.end_of_cluster:
pass
file.get_next_cluster() #repeat untill no more clusters
except c3_file_obj.no_more_clusters:
pass
except c3_file_obj.parse_error, error:
print error.description
print "somewhere around ", error.last
sys.exit(c3_config.C3_ERRLOCAL)
#######################################################
######### execute command on each node in cluster
# there are two main groups, local and remote clusters
# in each of those groups there are direct and indirect
# modes, that is every node specified or a "link". A link
# on a local cluster is of course invalid.
# right now the only way I know how to check if a cluster
# is local is to use a "gethostbyname" and compare it
# to the head node names (both internel and externel).
# right now that is acceptable as many tools require
# the function to work correctly (ssh being one of them)
# my want to think about a better way.
while interactive: # if interactive execution, prompt once before executing
answer = raw_input( "execute the following command(y or n): " + command + " :" )
if re.compile( r".*n(o)?.*", re.IGNORECASE ).match( answer ):
sys.exit(c3_config.C3_OKEXIT)
if re.compile( r".*y(es)?.*", re.IGNORECASE).match( answer ):
interactive = 0
pipe_list_outer = []
pid_list_outer = []
for cluster in clusters.clusters:
pipe_list_outer.append( os.pipe() )
pid = os.fork()
if pid == 0:
if not pipe_output:
line = "************************* " + cluster + " *************************\n"
os.write( pipe_list_outer[-1][1], line )
############ get machine names #############################
try:
local_ip = socket.gethostbyname( socket.gethostname() )
except socket.error:
print "Can not resolve local hostname"
os._exit(c3_config.C3_ERRLOCAL)
try:
int_ip = socket.gethostbyname( cluster_from_file[cluster]['internal'] )
except socket.error:
int_ip = ""
except KeyError:
print "Cluster ", cluster, " is not in the configuration file."
os.write( pipe_list_outer[-1][1], '\0' )
os._exit(c3_config.C3_ERRLOCAL)
try:
ext_ip = socket.gethostbyname( cluster_from_file[cluster]['external'] )
ext_ip_name = cluster_from_file[cluster]['external']
except socket.error:
ext_ip = ""
except TypeError:
ext_ip = int_ip
ext_ip_name = cluster_from_file[cluster]['internal']
############################################################
pipe_list = []
try:
if clusters.username[cluster] == "/default":
username = defusername
else:
username = clusters.username[cluster]
except KeyError: #if --all is specified
username = defusername
if head_node: #if only execute on head node , do so
#will execute on local cluster with ssh also
if not dryrun:
if not pipe_output:
line = "--------- " + cluster + "---------\n"
os.write( pipe_list_outer[-1][1], line )
string_to_execute = transport + " " + "-l " + username + " " + ext_ip + " \'" + command + "\'"
if dryrun:
print string_to_execute
else:
##fd = os.popen(string_to_execute)
#proc = popen2.Popen4(string_to_execute)
#string_in = proc.fromchild.read()
#temp = proc.wait()
p = Popen(string_to_execute, shell=True,
stdout=PIPE, stderr=STDOUT, close_fds=True)
string_in = p.communicate()[0]
temp = p.wait()
if( temp != 0 ):
returncode = c3_config.C3_ERRREMOTE
if pipe_output:
regex = re.compile(r"^.", re.MULTILINE)
string_in = regex.sub(cluster + " " + ext_ip_name + ": " + "\g<0>", string_in)
os.write(pipe_list_outer[-1][1], string_in + "\0" )
elif cluster_from_file[cluster]['external']: # if a direct cluster
if ext_ip == local_ip or int_ip == local_ip: # if a local cluster
if ext_ip == "": # error conditions (just don't execute current cluster)
print "Can not resolve ", cluster_from_file[cluster]['external']
elif int_ip == "":
print "Can not resolve ", cluster_from_file[cluster]['internal']
elif clusters.node[cluster][0] != "" : #range specified on command line
for node in clusters.node[cluster]: #for each cluster specified on the command line
try:
if not cluster_from_file[cluster]['nodes'][node].dead: #if machine is not dead
pipe_list.append( os.pipe() )
pid = os.fork() # execute command on each node in it's own process
if pid == 0:
node_name = cluster_from_file[cluster]['nodes'][node].name
if not pipe_output:
os.write( pipe_list[-1][1], "--------- " + node_name + "---------\n" )
string_to_execute = transport + " " + "-l " + username + " " + node_name + " \'" + command + " \'"
if dryrun:
print string_to_execute
else:
##fd = os.popen(string_to_execute)
#proc = popen2.Popen4(string_to_execute)
#string_in = proc.fromchild.read()
#temp = proc.wait()
p = Popen(string_to_execute, shell=True,
stdout=PIPE, stderr=STDOUT, close_fds=True)
string_in = p.communicate()[0]
temp = p.wait()
if( temp != 0 ):
returncode = c3_config.C3_ERRREMOTE
if pipe_output:
regex = re.compile(r"^.", re.MULTILINE)
string_in = regex.sub(cluster + " " + node_name + ": " + "\g<0>", string_in)
if len (string_in) == 0:
string_in = cluster + " " + node_name+ ":\n"
os.write( pipe_list[-1][1], string_in )
os.write( pipe_list[-1][1], '\0' )
os._exit(returncode)
pidlist.append(pid)
except IndexError:
pass
if dryrun:
os._exit(c3_config.C3_OKEXIT)
else: # no range specified on command line, do all nodes
for node in cluster_from_file[cluster]['nodes']: # for each node in cluster
if not node.dead: #if node not dead
pipe_list.append( os.pipe() )
pid = os.fork() # execute command in own process
if pid == 0:
node_name = node.name
if not pipe_output:
os.write( pipe_list[-1][1], "--------- " + node_name + "---------\n" )
string_to_execute = transport + " " + "-l " + username + " " + node_name + " \'" + command + " \'"
if dryrun:
print string_to_execute
os._exit(c3_config.C3_OKEXIT)
##fd = os.popen(string_to_execute)
#proc = popen2.Popen4(string_to_execute)
#string_in = proc.fromchild.read()
#temp = proc.wait()
p = Popen(string_to_execute, shell=True,
stdout=PIPE, stderr=STDOUT, close_fds=True)
string_in = p.communicate()[0]
temp = p.wait()
if( temp != 0 ):
returncode = c3_config.C3_ERRREMOTE
if pipe_output:
regex = re.compile(r"^.", re.MULTILINE)
string_in = regex.sub(cluster + " " + node_name + ": " + "\g<0>", string_in)
if len (string_in) == 0:
string_in = cluster + " " + node_name+ ":\n"
os.write( pipe_list[-1][1], string_in )
os.write( pipe_list[-1][1], '\0' )
os._exit(returncode)
pidlist.append(pid)
for pipe in pipe_list:
line = os.read( pipe[0], 1024)
while line[-1] != '\0':
line = line + os.read( pipe[0], 1024)
line=line[:-1]
os.write( pipe_list_outer[-1][1], line )
os.write( pipe_list_outer[-1][1], '\0' )
else: # remote cluster
if ext_ip == "": # error condition
print "Can not resolve ", cluster_from_file[cluster]['external']
sys.stdout.flush()
else:
# generate temprorary config file
cluster_def_string = "cluster " + cluster + " {\n"
cluster_def_string = cluster_def_string + cluster_from_file[cluster]['external'] + ":" + cluster_from_file[cluster]['internal'] + "\n"
if clusters.node[cluster][0] != "" : #range specified on command line
try:
for node in clusters.node[cluster]: #for each node specified on command line
if not cluster_from_file[cluster]['nodes'][node].dead: # if cluster node not dead
node_name = cluster_from_file[cluster]['nodes'][node].name #add cluster to list
cluster_def_string = cluster_def_string + node_name + "\n"
except IndexError:
pass
else: # no range specified on command line, do all nodes
for node in cluster_from_file[cluster]['nodes']: # for each node in cluster
if not node.dead: #if node not dead, add to list
cluster_def_string = cluster_def_string + node.name + "\n"
cluster_def_string = cluster_def_string + "}" # close list
#this is an attempt to generate a unique file name. As there is no easy way
#for me to see the remote machine I mangle a group of relativly unique
#identifiers. since i prepend the machine ip address if the file does not reside localy
#it should not remotely, at the very least it should be safe to rewrite the file
filename = "/tmp/" + local_ip + "%d" % os.getuid() + "%d" % os.getpid()
string_to_execute = transport + " " + "-l " + username + " " + ext_ip + " " + c3_config.def_path + "/cget --head --non_interactive " + filename
if dryrun:
print string_to_execute
else:
sock = c3_sock.server_sock( string_to_execute )
answer = sock.recieve()
sock.close()
while answer == 'good': #make sure file name is unique on local machine
filename = filename + "1"
string_to_execute = transport + " " + "-l " + username + " " + ext_ip + " " + c3_config.def_path + "/cget --head --non_interactive " + filename
sock.__init__( string_to_execute )
answer = sock.recieve()
sock.close()
FILE = open( filename, 'w' )
FILE.write( cluster_def_string )
FILE.close()
# push file to remote machine
string_to_execute = "rsync --rsh=" + transport + " " + filename + " " + username + "@" + ext_ip + ":" + filename
if dryrun:
print string_to_execute
else:
if(os.system( string_to_execute ) != 0):
returncode = c3_config.C3_ERRREMOTE
# execute commend remotely
string_to_execute = transport + " " + "-l " + username + " " + ext_ip + " \' " + c3_config.def_path + "/cexec -f " + filename + " " + options_to_pass + " \"" + command + " \"\'"
if dryrun:
print string_to_execute
else:
##fd = os.popen(string_to_execute)
#proc = popen2.Popen4(string_to_execute)
#string_in = proc.fromchild.read()
#temp = proc.wait()
p = Popen(string_to_execute, shell=True,
stdout=PIPE, stderr=STDOUT, close_fds=True)
string_in = p.communicate()[0]
temp = p.wait()
if( temp != 0 ):
returncode = c3_config.C3_ERRREMOTE
os.write( pipe_list_outer[-1][1], string_in )
os.write( pipe_list_outer[-1][1], '\0' )
# remove temporary file
string_to_execute = transport + " " + "-l " + username + " " + ext_ip + " /bin/rm -f " + filename
if dryrun:
print string_to_execute
else:
if(os.system( string_to_execute ) != 0):
returncode = c3_config.C3_ERRREMOTE
os.unlink( filename ) # remove local temp file
else: # indirect clusters
if int_ip == local_ip: # can not have a indirect local cluster since if your default cluster
# is local you would generate a circular reference
print "error local cluster"
sys.stdout.flush()
os.write( pipe_list_outer[-1][1], '\0' )
else: # remote indirect cluster
if int_ip == "": # error condition
print "Can not resolve hostname ", cluster_from_file[cluster]['internal']
sys.stdout.flush()
else:
node_list = "" # generate new node list from the command line
if clusters.node[cluster][0] != "" : #range specified on command line
node_list = ":%d" % clusters.node[cluster].pop(0)
for node in clusters.node[cluster]:
node_list = node_list + ", %d" % node
# execute command on remote machine
string_to_execute = transport + " " + "-l " + username + " " + int_ip + " \' " + c3_config.def_path + "/cexec " + options_to_pass + " " + node_list + " \"" + command + " \"\'"
if dryrun:
print string_to_execute
os._exit(c3_config.C3_OKEXIT)
else:
##fd = os.popen(string_to_execute)
#proc = popen2.Popen4(string_to_execute)
#string_in = proc.fromchild.read()
#temp = proc.wait()
p = Popen(string_to_execute, shell=True,
stdout=PIPE, stderr=STDOUT, close_fds=True)
string_in = p.communicate()[0]
temp = p.wait()
if( temp != 0 ):
returncode = c3_config.C3_ERRREMOTE
os.write( pipe_list_outer[-1][1], string_in )
os.write( pipe_list_outer[-1][1], '\0' )
for pid in pidlist:
pid, code = os.waitpid(pid,0)
code = code>>8
if( code != 0 ): returncode = code
os._exit(returncode)
pid_list_outer.append( pid )
if not dryrun:
output = ""
for pipe in pipe_list_outer:
line = os.read( pipe[0], 1024 )
while line[-1] != '\0':
line = line + os.read( pipe[0], 1024 )
output = output + line[:-1]
try:
n = len(output)
iw = 0
while n != iw:
try:
slice = output[iw:n]
nw = os.write(sys.stdout.fileno(), slice)
iw = iw + nw
except OSError, err:
import errno
if err.errno != errno.EAGAIN: raise
print >>sys.stderr, "OSERROR: ", err, "... retrying"
continue
except IndexError:
print "No computer returned any output."
for pid in pid_list_outer: # wait for all processes spawned to finish
pid, code = os.waitpid(pid,0)
code = code>>8
if( code != 0 ): returncode = code
except KeyboardInterrupt:
print "Keyboard interrupt\n"
sys.exit(returncode)
# vim:tabstop=4:shiftwidth=4:noexpandtab:textwidth=76
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment