-
Notifications
You must be signed in to change notification settings - Fork 4
Beam attenuation #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Beam attenuation #35
Changes from all commits
5b97d4f
9489e21
2dd5b96
a4a93a8
8454b50
5fd4afa
a046034
205f2c7
981e290
52e162f
6aab1d4
3450021
61477f6
a8ef9bf
0660377
8f605d1
5135f45
7d7c92f
08a33a4
f341268
2a216ac
eba6dd9
e2f16d3
2473895
dc71ab7
e5616dc
faaf823
dcdfb3f
50f999b
f52c573
5bbb671
d0fb3d2
9667175
beffc82
2b4348e
e61e305
d075612
d8d8baa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,3 +164,6 @@ Thumbs.db | |
| # IDEs | ||
| .vscode/ | ||
| .cursor/ | ||
|
|
||
| # Other | ||
| notes.* | ||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,245 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import math | ||
| from dataclasses import dataclass | ||
|
|
||
| import numpy as np | ||
| import xrayutilities as xu | ||
| from ophyd_async.core import ( | ||
| AsyncMovable, | ||
| AsyncStatus, | ||
| DeviceVector, | ||
| StandardReadable, | ||
| StrictEnum, | ||
| ) | ||
| from ophyd_async.epics.core import EpicsDevice, epics_signal_r, epics_signal_rw | ||
|
|
||
| from cditools.motors import Energy | ||
|
|
||
|
|
||
| @dataclass | ||
| class AttenuatorCombination: | ||
| transmission: float | ||
| attenuators: list[int] | ||
|
|
||
| @property | ||
| def attenuation(self): | ||
| return 1 - self.transmission | ||
|
|
||
|
|
||
| class AttenuatorStatusEnum(StrictEnum): | ||
| LOW = "Low" # off / not obstructing | ||
| HIGH = "High" # on / obstructing | ||
|
|
||
|
|
||
| class Attenuator(EpicsDevice, AsyncMovable[AttenuatorStatusEnum]): | ||
| def __init__(self, prefix: str, num: int, material: str, thickness: float): | ||
| """ | ||
| Parameters | ||
| ---------- | ||
| prefix : str | ||
| The common prefix for the attenuator bank | ||
| num : int | ||
| An integer denoting which attenuator within the bank this is | ||
| thickness : float | ||
| The thickness of the attenuator in microns | ||
|
|
||
| Attributes | ||
| ---------- | ||
| position : SignalRW[AttenuatorStatusEnum] | ||
| The read / write PV to open and close the attenuator and get | ||
| the current state of the attenuator | ||
| mode : SignalRW[bool] | ||
| in_status : SignalR[AttenuatorStatusEnum] | ||
| """ | ||
| self.prefix = prefix | ||
| self.num = num | ||
| self.filter_material = getattr(xu.materials, material) | ||
| self.thickness = thickness # microns | ||
|
|
||
| self.position = epics_signal_rw( | ||
| AttenuatorStatusEnum, | ||
| f"{self.prefix}:DO{self.num}-Sts", | ||
| write_pv=f"{self.prefix}:DO{self.num}-Cmd", | ||
| ) | ||
| self.mode = epics_signal_rw(bool, f"{self.prefix}:DIO{self.num}-Mode") | ||
| self.in_status = epics_signal_r( | ||
| AttenuatorStatusEnum, f"{self.prefix}:DI{self.num}-Sts" | ||
| ) | ||
|
|
||
| super().__init__(prefix=self.prefix) | ||
|
|
||
| def __repr__(self): | ||
| return f"{self.thickness!s} microns, {self.filter_material}" | ||
|
|
||
| @property | ||
| def name(self): | ||
| return f"attenuator_{self.num}" | ||
|
|
||
| @AsyncStatus.wrap | ||
| async def set(self, value: AttenuatorStatusEnum): | ||
| await self.position.set(value) | ||
|
|
||
| async def open(self): | ||
| """Open means open to allowing the beam to pass unobstructed""" | ||
| await self.position.set(AttenuatorStatusEnum.LOW) | ||
|
|
||
| async def close(self): | ||
| """Closed means obstructing the beam""" | ||
| await self.position.set(AttenuatorStatusEnum.HIGH) | ||
|
|
||
| def attenuation(self, photon_energy: float, egu: str = "KeV"): | ||
| """Attenuation is the fraction of the beam removed""" | ||
| return 1 - self.transmission(photon_energy, egu=egu) | ||
|
|
||
| def transmission(self, photon_energy: float, egu: str = "KeV"): | ||
| """Transmission is the fraction of beam remaining""" | ||
| abs_len = self._absorption_length(photon_energy, egu=egu) | ||
| return np.exp(-self.thickness / abs_len) | ||
|
|
||
| def _absorption_length(self, photon_energy: float, egu: str = "KeV") -> float: | ||
| """ | ||
| Calculates L, the characteristic absorption length of this material, | ||
| at this beam energy. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| photon energy : float | ||
| The beam energy | ||
| egu : {'KeV', 'eV'} | ||
| The engineering units of the beam energy | ||
|
|
||
| Returns | ||
| ------- | ||
| float | ||
| The characteristic absorption length of the filter material (microns) | ||
| """ | ||
| if egu == "KeV": | ||
| photon_energy = photon_energy * 1e3 | ||
| elif egu != "eV": | ||
| msg = "Photon energy units must be eV or KeV" | ||
| raise RuntimeError(msg) | ||
| return self.filter_material.absorption_length(photon_energy) # type: ignore[reportArgumentType] | ||
|
|
||
|
|
||
| class AttenuatorBank(StandardReadable, EpicsDevice, AsyncMovable[float]): | ||
| """ | ||
| The ioc for the iologik1 lives on xf09id1-inst-ioc1.nsls2.bnl.gov | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, prefix: str, atten_configs: list[tuple[str, float]], energy: Energy | ||
| ): | ||
| self.prefix = prefix | ||
| self.energy = energy | ||
|
|
||
| with self.add_children_as_readables(): | ||
| self.attenuators = DeviceVector( | ||
| { | ||
| i: Attenuator(self.prefix, i, material, thickness) | ||
| for i, (material, thickness) in enumerate(atten_configs, start=1) | ||
| } | ||
| ) | ||
| super().__init__(prefix=self.prefix) | ||
|
|
||
| def get_photon_energy(self): | ||
| return self.energy.energy.readback.get() | ||
|
|
||
| def get_egu(self): | ||
| return self.energy.egu | ||
|
|
||
| async def read(self): # type: ignore[reportUnknownParameterType] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs a matched |
||
| """ | ||
| Polls the bluesky energy object for the current beam energy, and | ||
| returns that energy, each filter position, each transmission, and | ||
| the total transmission. | ||
| """ | ||
| status = {} | ||
| active_attens = [] | ||
| energy = self.get_photon_energy() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use async flavor here for access to the hardware. |
||
| egu = self.get_egu() | ||
| positions = await asyncio.gather( | ||
| *(a.position.get_value() for _, a in self.attenuators.items()) | ||
| ) | ||
| for i, pos in zip(self.attenuators, positions): | ||
| atten = self.attenuators[i] | ||
| is_active = pos == AttenuatorStatusEnum.HIGH | ||
| if is_active: | ||
| active_attens.append(atten) | ||
| transmission = atten.transmission(energy, egu) if is_active else 0 | ||
| status[atten.name] = {"active": is_active, "transmission": transmission} | ||
| status["active_attenuators"] = [a.num for a in active_attens] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to be variable length which will give tiled fits (for now). Better to provide either fixed length list of bools or strings "in" and "out". |
||
| status["photon_energy"] = energy | ||
| status["egu"] = egu | ||
| status["total_transmission"] = self._calculate_total_transmission( | ||
| energy, *active_attens | ||
| ) | ||
| return status | ||
|
|
||
| @AsyncStatus.wrap | ||
| async def set(self, value: float): | ||
| """Set the transmission for the attenuator bank""" | ||
| attenuation_combination = self.find_closest_transmission( | ||
| self.get_photon_energy(), value | ||
| ) | ||
| coros = [] | ||
| for ( | ||
| num, | ||
| atten, | ||
| ) in self.attenuators.items(): | ||
| if num in attenuation_combination.attenuators: | ||
| coros.append(atten.close()) | ||
| else: | ||
| coros.append(atten.open()) | ||
| await asyncio.gather(*coros) | ||
|
|
||
| def find_closest_transmission( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check with Garth if we want "closest" or "closest with at least" or "closest with at most" semantics on this. My knee-jerk guess would be "closest with at least" on the logic of if you are cutting the flux down it is because you want to protect something (sample or detector) so it is better to over attenuate than under attenuate. |
||
| self, photon_energy: float, target_transmission: float | ||
| ) -> AttenuatorCombination: | ||
| available_attenuations = self._calculate_available_transmissions(photon_energy) | ||
| best_idx = np.argmin( | ||
| [abs(target_transmission - _.transmission) for _ in available_attenuations] | ||
| ) | ||
| return available_attenuations[best_idx] | ||
|
|
||
| def _calculate_available_transmissions( | ||
| self, photon_energy: float | ||
| ) -> list[AttenuatorCombination]: | ||
| """ | ||
| Calculates all possible transmissions for the attenuator bank, using | ||
| the powerset of the available attenuators. The result is not sorted, | ||
| as it is more efficient to scan linearly each time for the closest | ||
| match. | ||
| """ | ||
| available_transmissions = [] | ||
| for combination in self._powerset(): | ||
| attens = [self.attenuators[a] for a in self.attenuators if a in combination] | ||
| total_transmission = self._calculate_total_transmission( | ||
| photon_energy, *attens | ||
| ) | ||
| available_transmissions.append( | ||
| AttenuatorCombination(total_transmission, combination) | ||
| ) | ||
| return available_transmissions | ||
|
|
||
| def _calculate_total_transmission( | ||
| self, photon_energy: float, *attenuators: Attenuator | ||
| ) -> float: | ||
| transmissions = [ | ||
| a.transmission(photon_energy, self.get_egu()) for a in attenuators | ||
| ] | ||
| return round(float(math.prod(transmissions)), 3) | ||
|
|
||
| def _powerset(self) -> list[list[int]]: | ||
| """ | ||
| This is a famously O(n*2^n) problem. | ||
| """ | ||
| powerset = [] | ||
| for i in range(1 << len(self.attenuators)): | ||
| combination = [] | ||
| for j in range(len(self.attenuators)): | ||
| if i & (1 << j): | ||
| combination.append(j + 1) # +1 because attenuators are 1-indexed | ||
| powerset.append(combination) | ||
| return powerset | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.