Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 176 additions & 83 deletions alpaca/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,48 @@ def ImageArray(self) -> List[int]:
"""
return self._get_imagedata("imagearray")

@property
def ImageArrayRaw(self) -> array.array:
"""Return an array containing the exposure pixel values.

Raises:
InvalidOperationException: If no image data is available
NotConnectedException: If the device is not connected
DriverException: An error occurred that is not described by one of the more specific ASCOM exceptions. The device did not *successfully* complete the request.

Note:
* The returned array is in row-major format, and typically must be transposed
for use with *numpy* and *astropy* for creating FITS files. See the example
below.
* Automatically adapts to devices returning either JSON image data or the much
faster ImageBytes format. In either case the returned array
contains standard Python int or float pixel values. See the
|ImageBytes|.
See :attr:`ImageArrayInfo` for metadata covering the returned image data.

.. |ImageBytes| raw:: html

<a href="https://github.com/ASCOMInitiative/ASCOMRemote/raw/main/Documentation/ASCOM%20Alpaca%20API%20Reference.pdf" target="_blank">
Alpaca API Reference</a> (external)

.. admonition:: Master Interfaces Reference
:class: green

.. only:: html

|ImageArray|

.. |ImageArray| raw:: html

<a href="https://ascom-standards.org/newdocs/camera.html#Camera.ImageArray" target="_blank">
Camera.ImageArray</a> (external)

.. only:: rinoh

`Camera.ImageArray <https://ascom-standards.org/newdocs/camera.html#Camera.ImageArray>`_
"""
return self._get_imagedata_as_array("imagearray")

@property
def ImageArrayInfo(self) -> ImageMetadata:
"""Get image metadata sucn as dimensions, data type, rank.
Expand Down Expand Up @@ -2363,7 +2405,7 @@ def StopExposure(self) -> None:
# === LOW LEVEL ROUTINES TO GET IMAGE DATA WITH OPTIONAL IMAGEBYTES ===
# https://www.w3resource.com/python/python-bytes.php#byte-string

def _get_imagedata(self, attribute: str, **data) -> str:
def _fetch_imagedata_response(self, attribute: str, **data) -> requests.Response:
"""TBD

Args:
Expand All @@ -2388,100 +2430,151 @@ def _get_imagedata(self, attribute: str, **data) -> str:
finally:
Device._ctid_lock.release()

if response.status_code not in range(200, 204): # HTTP level errors
if response.status_code not in range(200, 204): # HTTP level errors
raise AlpacaRequestException(response.status_code,
f"{response.reason}: {response.text} (URL {response.url})")
f"{response.reason}: {response.text} (URL {response.url})")

return response

def _get_json_imagedata(self, response):
j = response.json()
n = j["ErrorNumber"]
m = j["ErrorMessage"]
raise_alpaca_if(n, m) # Raise Alpaca Exception if non-zero Alpaca error
l = j["Value"] # Nested lists
if type(l[0][0]) == list: # Test & pick up color plane
r = 3
d3 = len(l[0][0])
else:
r = 2
d3 = 0
self.img_desc = ImageMetadata(
1, # Meta version
ImageArrayElementTypes.Int32, # Image element type
ImageArrayElementTypes.Int32, # Xmsn element type
r, # Rank
len(l), # Dimension 1
len(l[0]), # Dimension 2
d3 # Dimension 3
)
return l

def _build_imagedata_array(self, response):
m = 'little'
b = response.content
n = int.from_bytes(b[4:8], m)
if n != 0:
m = response.text[44:].decode(encoding='UTF-8')
raise_alpaca_if(n, m) # Will raise here
self.img_desc = ImageMetadata(
int.from_bytes(b[0:4], m), # Meta version
int.from_bytes(b[20:24], m), # Image element type
int.from_bytes(b[24:28], m), # Xmsn element type
int.from_bytes(b[28:32], m), # Rank
int.from_bytes(b[32:36], m), # Dimension 1
int.from_bytes(b[36:40], m), # Dimension 2
int.from_bytes(b[40:44], m) # Dimension 3
)
#
# Bless you Kelly Bundy and Mark Ransom
# https://stackoverflow.com/questions/71774719/native-array-frombytes-not-numpy-mysterious-behavior/71776522#71776522
#
if self.img_desc.TransmissionElementType == ImageArrayElementTypes.Int16.value:
tcode = 'h'
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.UInt16.value:
tcode = 'H'
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.Int32.value:
tcode = 'i'
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.Double.value:
tcode = 'd'
# Extension types for future. 64-bit pixels are unlikely to be seen on the wire
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.Byte.value:
tcode = 'B' # Unsigned
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.UInt32.value:
tcode = 'I'
else:
raise InvalidValueException("Unknown or as-yet unsupported ImageBytes Transmission Array Element Type")
#
# Assemble byte stream back into indexable machine data types
#
a = array.array(tcode)
data_start = int.from_bytes(b[16:20], m)
a.frombytes(b[data_start:]) # 'h', 'H', 16-bit ints 2 bytes get turned into Python 32-bit ints

return a

ct = response.headers.get('content-type') # case insensitive
def _build_imagedata_nested_list_array(self, a: array.array):
#
# Convert to common Python nested list "array".
#
l = []
rows = self.img_desc.Dimension1
cols = self.img_desc.Dimension2
if self.img_desc.Rank == 3:
for i in range(rows):
# rowidx = i * cols * 3
r = []
for j in range(cols):
colidx = j * 3
r.append(a[colidx:colidx + 3])
l.append(r)
else:
for i in range(rows):
rowidx = i * cols
l.append(a[rowidx:rowidx + cols])

return l # Nested lists

def _get_imagedata(self, attribute: str, **data):
"""
Fetch image data from the Alpaca server.

- If the server responds with `application/imagebytes`, returns a
nested list (row-major order) reconstructed from the binary stream.
- If the server responds with JSON image data, returns the nested list
structure provided by the server.

This method is intended for general compatibility and will always
return nested Python lists regardless of the wire format.
"""
response = self._fetch_imagedata_response(attribute, **data)
ct = response.headers.get('content-type') # case insensitive
m = 'little'
#
# IMAGEBYTES
#
if ct == 'application/imagebytes':
b = response.content
n = int.from_bytes(b[4:8], m)
if n != 0:
m = response.text[44:].decode(encoding='UTF-8')
raise_alpaca_if(n, m) # Will raise here
self.img_desc = ImageMetadata(
int.from_bytes(b[0:4], m), # Meta version
int.from_bytes(b[20:24], m), # Image element type
int.from_bytes(b[24:28], m), # Xmsn element type
int.from_bytes(b[28:32], m), # Rank
int.from_bytes(b[32:36], m), # Dimension 1
int.from_bytes(b[36:40], m), # Dimension 2
int.from_bytes(b[40:44], m) # Dimension 3
)
#
# Bless you Kelly Bundy and Mark Ransom
# https://stackoverflow.com/questions/71774719/native-array-frombytes-not-numpy-mysterious-behavior/71776522#71776522
#
if self.img_desc.TransmissionElementType == ImageArrayElementTypes.Int16.value:
tcode = 'h'
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.UInt16.value:
tcode = 'H'
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.Int32.value:
tcode = 'l'
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.Double.value:
tcode = 'd'
# Extension types for future. 64-bit pixels are unlikely to be seen on the wire
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.Byte.value:
tcode = 'B' # Unsigned
elif self.img_desc.TransmissionElementType == ImageArrayElementTypes.UInt32.value:
tcode = 'L'
else:
raise InvalidValueException("Unknown or as-yet unsupported ImageBytes Transmission Array Element Type")
#
# Assemble byte stream back into indexable machine data types
#
a = array.array(tcode)
data_start = int.from_bytes(b[16:20],m)
a.frombytes(b[data_start:]) # 'h', 'H', 16-bit ints 2 bytes get turned into Python 32-bit ints
#
# Convert to common Python nested list "array".
#
l = []
rows = self.img_desc.Dimension1
cols = self.img_desc.Dimension2
if self.img_desc.Rank == 3:
for i in range(rows):
rowidx = i * cols * 3
r = []
for j in range(cols):
colidx = j * 3
r.append(a[colidx:colidx+3])
l.append(r)
else:
for i in range(rows):
rowidx = i * cols
l.append(a[rowidx:rowidx+cols])

return l # Nested lists
a = self._build_imagedata_array(response)
return self._build_imagedata_nested_list_array(a)
#
# JSON IMAGE DATA -> List of Lists (row major)
#
else:
j = response.json()
n = j["ErrorNumber"]
m = j["ErrorMessage"]
raise_alpaca_if(n, m) # Raise Alpaca Exception if non-zero Alpaca error
l = j["Value"] # Nested lists
if type(l[0][0]) == list: # Test & pick up color plane
r = 3
d3 = len(l[0][0])
else:
r = 2
d3 = 0
self.img_desc = ImageMetadata(
1, # Meta version
ImageArrayElementTypes.Int32, # Image element type
ImageArrayElementTypes.Int32, # Xmsn element type
r, # Rank
len(l), # Dimension 1
len(l[0]), # Dimension 2
d3 # Dimension 3
return self._get_json_imagedata(response)

def _get_imagedata_as_array(self, attribute: str, **data):
"""
Fetch image data from the Alpaca server and return it as a flat
`array.array` of raw pixel values.

- Only supported if the server responds with `application/imagebytes`.
- If the server responds with JSON image data instead, this method
raises `InvalidValueException`.

This method is intended for high-performance consumers who want direct
access to the raw pixel buffer. Use `_get_imagedata()` if you also
need to support JSON image responses.
"""
response = self._fetch_imagedata_response(attribute, **data)
ct = response.headers.get('content-type') # case insensitive
if ct == 'application/imagebytes':
return self._build_imagedata_array(response)
else:
raise InvalidValueException(
f"Expected 'application/imagebytes' response, got '{ct}'. "
"Use _get_imagedata() instead to handle JSON image data."
)
return l


def raise_alpaca_if(n, m):
"""If non-zero Alpaca error, raise the appropriate Alpaca exception
Expand Down
Loading