'''
The main module for Pynac.
'''
import subprocess as subp
from contextlib import contextmanager
from concurrent.futures import ProcessPoolExecutor
import os
import shutil
import glob
from collections import namedtuple
import warnings
from IPython.display import display
import ipywidgets as widgets
from ipywidgets import HBox, VBox
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure
from bokeh.layouts import gridplot
from bokeh.io import show, push_notebook
from Pynac.DataClasses import Param, SingleDimPS, CentreOfGravity
import Pynac.Elements as pyEle
[docs]def multiProcessPynac(filelist, pynacFunc, numIters = 100, max_workers = 8):
'''
Use a ProcessPool from the ``concurrent.futures`` module to execute ``numIters``
number of instances of ``pynacFunc``. This function takes advantage of ``doSingleDynacProcess``
and ``pynacInSubDirectory``.
'''
with ProcessPoolExecutor(max_workers = max_workers) as executor:
tasks = [executor.submit(doSingleDynacProcess, num, filelist, pynacFunc) for num in range(numIters)]
exc = [task.exception() for task in tasks if task.exception()]
if exc:
return exc
else:
return "No errors encountered"
[docs]def doSingleDynacProcess(num, filelist, pynacFunc):
'''
Execute ``pynacFunc`` in the ``pynacInSubDirectory`` context manager. See the
docstring for that context manager to understand the meaning of the ``num`` and
``filelist`` inputs.
The primary purpose of this function is to enable multiprocess use of Pynac via
the ``multiProcessPynac`` function.
'''
with pynacInSubDirectory(num, filelist):
pynacFunc()
@contextmanager
[docs]def pynacInSubDirectory(num, filelist):
'''
A context manager to create a new directory, move the files listed in ``filelist``
to that directory, and change to that directory before handing control back to
context. The closing action is to change back to the original directory.
The directory name is based on the ``num`` input, and if it already exists, it
will be deleted upon entering the context.
The primary purpose of this function is to enable multiprocess use of Pynac via
the ``multiProcessPynac`` function.
'''
print('Running %d' % num)
newDir = 'dynacProc_%04d' % num
if os.path.isdir(newDir):
shutil.rmtree(newDir)
os.mkdir(newDir)
for f in filelist:
shutil.copy(f, newDir)
os.chdir(newDir)
yield
os.chdir('..')
[docs]class Pynac(object):
'''
The primary entry point for performing simulations. Objects of this class
contain all the necessary information to perform a Dynac simulation, as well
as methods to manipulate the lattice, and to make the call to Dynac.
'''
_DEBUG = False
_fieldData = {
'INPUT': 2,
'RDBEAM': 5,
'BMAGNET': 4,
'FSOLE': 2,
'SECORD': 0,
'RASYN': 0,
'EDFLEC': 2,
'FIRORD': 0,
'CAVMC': 2,
'CAVNUM': 2,
'FIELD': 2,
'RWFIELD': 0,
'HARM': 3,
'EGUN': 2,
'RFQPTQ': 3,
'TILT': 2,
'RANDALI': 2,
'WRBEAM': 2,
'EMIT': 0,
'EMITGR': 3,
'ENVEL': 4,
'PROFGR': 3,
'STOP': 0,
'ZONES': 2,
'T3D': 0
}
def __init__(self, filename=None):
if filename:
self.filename = filename
with open(self.filename, 'r') as file:
self.rawData = [' '.join(line.split()) for line in file]
if self._DEBUG:
print("rawData:")
print(self.rawData)
self._parse()
@classmethod
[docs] def from_lattice(cls, name, lattice):
pyn = cls()
pyn.name = name
pyn.lattice = lattice
return pyn
[docs] def run(self):
'''
Run the simulation in the current directory.
'''
self._startDynacProc(stdin=subp.PIPE, stdout=subp.PIPE)
str2write = self.name + '\r\n'
if self._DEBUG:
with open('pynacrun.log', 'a') as f:
f.write(str2write)
self.dynacProc.stdin.write(str2write.encode()) # The name field
for pynEle in self.lattice:
try:
ele = pynEle.dynacRepresentation()
except AttributeError:
ele = pynEle
str2write = ele[0]
if self._DEBUG:
with open('pynacrun.log', 'a') as f:
f.write(str2write + '\r\n')
try:
self.dynacProc.stdin.write((str2write + '\r\n').encode())
except IOError:
break
for datum in ele[1]:
str2write = ' '.join([str(i) for i in datum])
if self._DEBUG:
with open('pynacrun.log', 'a') as f:
f.write(str2write+'\r\n')
try:
self.dynacProc.stdin.write((str2write+'\r\n').encode())
except IOError:
break
self.dynacProc.stdin.close()
if self.dynacProc.wait() != 0:
raise RuntimeError("Errors occured during execution of Dynac")
[docs] def getXinds(self, *X):
'''
Return the indices into the lattice list attribute of elements whose Dynac
type matches the input string. Multiple input strings can be given, either
as a comma-separated list or as a genuine Python list.
'''
return [i for i,x in enumerate(self.lattice) for y in X if DynacFromEle(x) == y]
[docs] def getPlotInds(self):
'''
Return the indices into the lattice list attribute of elements that result
in Dynac plot output.
'''
return self.getXinds('EMITGR','ENVEL','PROFGR')
[docs] def getNumPlots(self):
'''
Return the number of Dynac plots that will be output when Dynac is run.
'''
return len(self.getPlotInds()) + 2 * len(self.getXinds('ENVEL'))
[docs] def setNewRDBEAMfile(self, filename):
'''
Change the current ``RDBEAM`` command to point at another Dynac input file.
This will raise an IndexError if the lattice doesn't contain an RDBEAM
command.
'''
self.lattice[self.getXinds('RDBEAM')[0]][1][0][0] = filename
def _startDynacProc(self, stdin, stdout):
# self.dynacProc = subp.Popen(['dynacv6_0','--pipe'], stdin=stdin, stdout=stdout)
self.dynacProc = subp.Popen(
['dynacv6_0','--pipe'],
stdin=stdin,
stdout=stdout,
stderr=subp.PIPE
)
if b'Error' in self.dynacProc.stdout.readline():
raise RuntimeError('Installed version of Dynac should be upgraded to support the --pipe flag')
def _loop(self, item):
print(item)
self.dynacProc.stdin.write(item[0] + '\r\n')
for data in item[1]:
self.dynacProc.stdin.write(data)
def _parse(self):
self.lattice = []
self.name = self.rawData[0]
ind = 1
while ind < len(self.rawData):
if not (self.rawData[ind] == '' or self.rawData[ind][0] == ';'):
ind = self._parsedChunk(ind)
if self._DEBUG:
print(self.lattice[-1])
ind += 1
def _parsedChunk(self, currentInd):
dynacStr = self.rawData[currentInd]
try:
numFields = self._fieldData[dynacStr]
except KeyError:
if dynacStr == 'GEBEAM':
numFields = self._getNumFieldsFromiTwiss(currentInd)
elif dynacStr == 'SCDYNAC':
numFields = self._getNumFieldsFromISCSP(currentInd)
else:
numFields = 1
dataStr = [self.rawData[currentInd+i+1] for i in range(numFields)]
dat = []
for term in dataStr:
try:
if self._mightBeNumber(term):
dat.append([float(term)])
else:
dat.append([int(term)])
except ValueError:
try:
dat.append([float(i) if self._mightBeNumber(i) else int(i) for i in term.split(' ')])
except ValueError:
dat.append([term])
self.lattice.append(EleFromPynac([dynacStr, dat]))
return currentInd + numFields
def _getNumFieldsFromiTwiss(self, ind):
iTwiss = self.rawData[ind+1].split()[1]
return {'1': 6, '0': 4}[iTwiss]
def _getNumFieldsFromISCSP(self, ind):
iscsp = self.rawData[ind+1]
return {
'1': 3,
'-1': 6,
'2': 3,
'3': 3 if self.rawData[ind+3]=='0' else 4
}[iscsp]
def _mightBeNumber(self, thing):
return ('.' in thing) or ('e' in thing) or ('E' in thing)
[docs]class PhaseSpace:
'''
A representation of the phase space of the simulated bunch read from the
``dynac.short`` file. Each of the phase space parameters is represented as a
``elements.Parameter`` namedtuple.
This class is intended to be used in interactive explorations of the data
produced during Pynac simulations.
'''
def __init__(self, dataStrMatrix):
self.dataStrMatrix = dataStrMatrix
self.xPhaseSpace = self._getPSFromLine(5)
self.yPhaseSpace = self._getPSFromLine(6)
self.zPhaseSpace = self._getPSFromLine(4)
self.COG = CentreOfGravity(
x = Param(val = float(self.dataStrMatrix[1][0]), unit = 'mm'),
xp = Param(val = float(self.dataStrMatrix[1][1]), unit = 'mrad'),
y = Param(val = float(self.dataStrMatrix[1][2]), unit = 'mm'),
yp = Param(val = float(self.dataStrMatrix[1][3]), unit = 'mrad'),
KE = Param(val = float(self.dataStrMatrix[0][3]), unit = 'MeV'),
TOF = Param(val = float(self.dataStrMatrix[0][4]), unit = 'deg'),
)
self.particlesLeft = Param(val = float(self.dataStrMatrix[4][5]), unit = 'num')
def _getPSFromLine(self, num):
try:
test = float(self.dataStrMatrix[num][6])
return SingleDimPS(
pos = Param(val = float(self.dataStrMatrix[num][0]), unit = 'mm'),
mom = Param(val = float(self.dataStrMatrix[num][1]), unit = 'mrad'),
R12 = Param(val = float(self.dataStrMatrix[num][2]), unit = '?'),
normEmit = Param(val = float(self.dataStrMatrix[num][3]), unit = 'mm.mrad'),
nonNormEmit = Param(val = float(self.dataStrMatrix[num][6]), unit = 'mm.mrad'),
)
except:
return SingleDimPS(
pos = Param(val = float(self.dataStrMatrix[num][0]), unit = 'deg'),
mom = Param(val = float(self.dataStrMatrix[num][1]), unit = 'keV'),
R12 = Param(val = float(self.dataStrMatrix[num][2]), unit = '?'),
normEmit = Param(val = float(self.dataStrMatrix[num][3]), unit = 'keV.ns'),
nonNormEmit = Param(val = None, unit = None),
)
def __repr__(self):
reprStr = 'COG: ' + self.COG.__repr__()
reprStr += '\nparticles left: ' + self.particlesLeft.__repr__()
reprStr += '\nx: ' + self.xPhaseSpace.__repr__()
reprStr += '\ny: ' + self.yPhaseSpace.__repr__()
reprStr += '\nz: ' + self.zPhaseSpace.__repr__()
return reprStr
[docs]def makePhaseSpaceList():
'''
Extract all the phase space information (due to ``EMIT`` commands in the input
file), and create a list of PhaseSpace objects. The primary purpose of this
is for interactive explorations of the data produced during Pynac simulations.
'''
with open('dynac.short') as f:
dataStr = ''.join(line for line in f.readlines())
dataStrArray = dataStr.split('beam (emit card)')[1:]
dataStrMatrix = [[j.strip().split() for j in i] for i in[chunk.split('\n')[1:8] for chunk in dataStrArray]]
return [PhaseSpace(data) for data in dataStrMatrix]
[docs]def getNumberOfParticles():
'''
Queries the ``dynac.short`` file for the number of particles used in the
simulation.
'''
with open('dynac.short') as f:
dataStr = ''.join(line for line in f.readlines())
numOfParts = int(dataStr.split('Simulation with')[1].strip().split()[0])
return numOfParts
[docs]def DynacFromEle(ele):
try:
dynStr = ele.dynacRepresentation()[0]
except AttributeError:
dynStr = ele[0]
return dynStr
[docs]def EleFromPynac(pynacRepr):
try:
constructor = getattr(pyEle, pyEle._dynac2pynac[pynacRepr[0]])
obj = constructor.from_dynacRepr(pynacRepr)
except AttributeError:
obj = pynacRepr
return obj
[docs]def buildABeam():
betaX = widgets.FloatSlider(
value=7.5,
min=0.01,
max=20.0,
step=0.01,
description='betaX:',
disabled=False,
continuous_update=False,
)
alphaX = widgets.FloatSlider(
value=0.0,
min=-5.0,
max=5.0,
step=0.01,
description='alphaX:',
disabled=False,
continuous_update=False,
)
emitX = widgets.FloatSlider(
value=0.5,
min=0.01,
max=10.0,
step=0.01,
description='emitX:',
disabled=False,
continuous_update=False,
)
betaY = widgets.FloatSlider(
value=7.5,
min=0.01,
max=20.0,
step=0.01,
description='betaY:',
disabled=False,
continuous_update=False,
)
alphaY = widgets.FloatSlider(
value=0.0,
min=-5.0,
max=5.0,
step=0.01,
description='alphaY:',
disabled=False,
continuous_update=False,
)
emitY = widgets.FloatSlider(
value=0.5,
min=0.01,
max=10.0,
step=0.01,
description='emitY:',
disabled=False,
continuous_update=False,
)
betaZ = widgets.FloatSlider(
value=7.5,
min=0.01,
max=20.0,
step=0.01,
description='betaZ:',
disabled=False,
continuous_update=False,
)
alphaZ = widgets.FloatSlider(
value=0.0,
min=-5.0,
max=5.0,
step=0.01,
description='alphaZ:',
disabled=False,
continuous_update=False,
)
emitZ = widgets.FloatSlider(
value=500,
min=1.0,
max=1000.0,
step=1.0,
description='emitZ:',
disabled=False,
continuous_update=False,
)
def getPynacInput():
beam = ['GEBEAM', [
[4, 1],
[352.21e6, 1000],
[0, 0, 0, 0, 0, 0],
[alphaX.value, betaX.value, emitX.value],
[alphaY.value, betaY.value, emitY.value],
[alphaZ.value, betaZ.value, emitZ.value],
]]
return beam
def getDynacInput():
beam = 'GEBEAM\r\n'
beam += '4 1\r\n'
beam += '352.21e6 1000\r\n'
beam += '0 0 0 0 0 0\r\n'
beam += '%f %f %f\r\n' % (alphaX.value, betaX.value, emitX.value)
beam += '%f %f %f\r\n' % (alphaY.value, betaY.value, emitY.value)
beam += '%f %f %f' % (alphaZ.value, betaZ.value, emitZ.value)
return beam
cssHeightStr = '150px'
viewBtn = widgets.Button(description="View Pynac Input")
pynacViewArea = widgets.Textarea()
pynacViewArea.layout.height = cssHeightStr
dynacViewArea = widgets.Textarea()
dynacViewArea.layout.height = cssHeightStr
pynacViewArea.value = getPynacInput().__str__()
dynacViewArea.value = getDynacInput()
lattice = []
lattice.append(getPynacInput())
lattice.append(['INPUT', [[938.27231, 1.0, 1.0], [3.6223537, 0.0]]])
lattice.append(['REFCOG', [[0]]])
lattice.append(['EMITGR', [['Generated Beam'], [0, 9], [0.5, 80.0, 0.5, 80.0, 0.5, 0.5, 50.0, 1.0]]])
lattice.append(['STOP', []])
test = Pynac.from_lattice("Zero-length lattice for beam generation", lattice)
test.run()
with open('emit.plot') as f:
for i in range(204):
f.readline()
numParts = int(f.readline())
x, xp = [], []
for i in range(numParts):
dat = f.readline().split()
x.append(float(dat[0]))
xp.append(float(dat[1]))
f.readline()
for i in range(202):
f.readline()
y, yp = [], []
for i in range(numParts):
dat = f.readline().split()
y.append(float(dat[0]))
yp.append(float(dat[1]))
f.readline()
for i in range(203):
f.readline()
z, zp = [], []
for i in range(numParts):
dat = f.readline().split()
z.append(float(dat[0]))
zp.append(float(dat[1]))
dataSource = ColumnDataSource(data=dict(x=x, xp=xp, y=y, yp=yp, z=z, zp=zp))
p0 = figure(plot_height=250, plot_width=296, y_range=(-5, 5), x_range=(-1, 1))
p0.xaxis.axis_label = 'Horizontal position'
p0.yaxis.axis_label = 'Horizontal angle'
p0.circle('x', 'xp', color="#2222aa", alpha=0.5, line_width=2, source=dataSource)
p1 = figure(plot_height=250, plot_width=296, y_range=(-5, 5), x_range=(-1, 1))
p1.xaxis.axis_label = 'Vertical position'
p1.yaxis.axis_label = 'Vertical angle'
p1.circle('y', 'yp', color="#2222aa", alpha=0.5, line_width=2, source=dataSource, name="foo")
p2 = figure(plot_height=250, plot_width=296, y_range=(-0.1, 0.1), x_range=(-150, 150))
p2.xaxis.axis_label = 'Longitudinal phase'
p2.yaxis.axis_label = 'Longitudinal energy'
p2.circle('z', 'zp', color="#2222aa", alpha=0.5, line_width=2, source=dataSource, name="foo")
grid = gridplot([[p0, p1, p2]])
show(grid, notebook_handle=True)
push_notebook()
def on_button_clicked(b):
pynacViewArea.value = getPynacInput().__str__()
dynacViewArea.value = getDynacInput()
lattice[0] = getPynacInput()
test = Pynac.from_lattice("Zero-length lattice for beam generation", lattice)
test.run()
with open('emit.plot') as f:
for i in range(204):
f.readline()
numParts = int(f.readline())
x, xp = [], []
for i in range(numParts):
dat = f.readline().split()
x.append(float(dat[0]))
xp.append(float(dat[1]))
f.readline()
for i in range(202):
f.readline()
y, yp = [], []
for i in range(numParts):
dat = f.readline().split()
y.append(float(dat[0]))
yp.append(float(dat[1]))
f.readline()
for i in range(203):
f.readline()
z, zp = [], []
for i in range(numParts):
dat = f.readline().split()
z.append(float(dat[0]))
zp.append(float(dat[1]))
dataSource.data['x'] = x
dataSource.data['xp'] = xp
dataSource.data['y'] = y
dataSource.data['yp'] = yp
dataSource.data['z'] = z
dataSource.data['zp'] = zp
push_notebook()
label_layout = widgets.Layout(width='100%')
twissXLabel = widgets.Label(value = "Horizontal Twiss", layout = label_layout)
twissYLabel = widgets.Label(value = "Vertical Twiss", layout = label_layout)
twissZLabel = widgets.Label(value = "Longitudinal Twiss", layout = label_layout)
for slider in [betaX, alphaX, emitX, betaY, alphaY, emitY, betaZ, alphaZ, emitZ]:
slider.observe(on_button_clicked)
pynacBoxLabel = widgets.Label(value = "Pynac Input", layout = label_layout)
dynacBoxLabel = widgets.Label(value = "Dynac Input", layout = label_layout)
controls = VBox([
HBox([
VBox([twissXLabel, betaX, alphaX, emitX, pynacBoxLabel, pynacViewArea]),
VBox([twissYLabel, betaY, alphaY, emitY, dynacBoxLabel, dynacViewArea]),
VBox([twissZLabel, betaZ, alphaZ, emitZ]),
]),
])
display(controls)