Last active
May 13, 2023 15:28
-
-
Save mansourmoufid/0c26993bc82f7b62056cff1733ebfba8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ctypes | |
import struct | |
import typing | |
import AVFoundation | |
import CoreMediaIO | |
import objc | |
Boolean = ctypes.c_ubyte | |
Float64 = ctypes.c_double | |
CoreMediaIO.CMIOObjectID = ctypes.c_uint32 | |
CoreMediaIO.CMIOObjectPropertyElement = ctypes.c_uint32 | |
CoreMediaIO.CMIOObjectPropertyScope = ctypes.c_uint32 | |
CoreMediaIO.CMIOObjectPropertySelector = ctypes.c_uint32 | |
def avf_get_device_uuid(name: str) -> typing.Optional[str]: | |
discovery_session = getattr( | |
AVFoundation.AVCaptureDeviceDiscoverySession, | |
'discoverySessionWithDeviceTypes_mediaType_position_' | |
)( | |
[ | |
AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera, | |
AVFoundation.AVCaptureDeviceTypeExternalUnknown, | |
], | |
AVFoundation.AVMediaTypeVideo, | |
AVFoundation.AVCaptureDevicePositionUnspecified | |
) | |
devices = discovery_session.devices() | |
for device in devices: | |
if device.localizedName() == name: | |
return device.uniqueID() | |
return None | |
def cmio_objects(data) -> typing.Generator[int, None, None]: | |
if data is None: | |
return | |
n = ctypes.sizeof(CoreMediaIO.CMIOObjectID) | |
for i in range(int(len(data) / n)): | |
yield struct.unpack('I', data[(i * n):][:n])[0] | |
def CMIOObjectPropertyAddress( | |
selector: CoreMediaIO.CMIOObjectPropertySelector, | |
scope: typing.Optional[CoreMediaIO.CMIOObjectPropertyScope] = None, | |
element: typing.Optional[CoreMediaIO.CMIOObjectPropertyElement] = None, | |
) -> CoreMediaIO.CMIOObjectPropertyAddress: | |
if scope is None: | |
scope = CoreMediaIO.kCMIOObjectPropertyScopeGlobal | |
if element is None: | |
element = CoreMediaIO.kCMIOObjectPropertyElementMain | |
return CoreMediaIO.CMIOObjectPropertyAddress( | |
mSelector=selector, | |
mScope=scope, | |
mElement=element, | |
) | |
def cmio_get_property( | |
object_id: CoreMediaIO.CMIOObjectID, | |
address: CoreMediaIO.CMIOObjectPropertyAddress, | |
) -> typing.Optional[bytes]: | |
if not CoreMediaIO.CMIOObjectHasProperty(object_id, address): | |
return None | |
status, data_size = CoreMediaIO.CMIOObjectGetPropertyDataSize( | |
object_id, | |
address, | |
0, | |
None, | |
None | |
) | |
if not status == CoreMediaIO.kCMIOHardwareNoError: | |
return None | |
status, _, _, data = CoreMediaIO.CMIOObjectGetPropertyData( | |
object_id, | |
address, | |
0, | |
None, | |
data_size, | |
None, | |
None | |
) | |
if not status == 0: | |
return None | |
return data | |
def cmio_set_property( | |
object_id: CoreMediaIO.CMIOObjectID, | |
address: CoreMediaIO.CMIOObjectPropertyAddress, | |
data: ctypes._SimpleCData, | |
) -> bool: | |
if not CoreMediaIO.CMIOObjectHasProperty(object_id, address): | |
return False | |
status, settable = CoreMediaIO.CMIOObjectIsPropertySettable( | |
object_id, | |
address, | |
None | |
) | |
if status == 0: | |
if not settable: | |
return False | |
status = CoreMediaIO.CMIOObjectSetPropertyData( | |
object_id, | |
address, | |
0, | |
None, | |
ctypes.sizeof(data), | |
ctypes.addressof(data) | |
) | |
return status == 0 | |
def cmio_get_device_with_id(uuid: str) -> typing.Optional[int]: | |
devices = cmio_objects(cmio_get_property( | |
object_id=CoreMediaIO.kCMIOObjectSystemObject, | |
address=CMIOObjectPropertyAddress( | |
selector=CoreMediaIO.kCMIOHardwarePropertyDevices, | |
), | |
)) | |
for device in devices: | |
data = cmio_get_property( | |
object_id=device, | |
address=CMIOObjectPropertyAddress( | |
selector=CoreMediaIO.kCMIODevicePropertyDeviceUID, | |
), | |
) | |
if data is not None: | |
uid = objc.objc_object( | |
c_void_p=ctypes.c_void_p.from_buffer_copy(data) | |
) | |
if uid == uuid: | |
return device | |
return None | |
def cmio_stream_frame_rate( | |
stream: int, | |
rate: typing.Optional[int] = None, | |
) -> typing.Optional[float]: | |
address = CMIOObjectPropertyAddress( | |
selector=CoreMediaIO.kCMIOStreamPropertyFrameRate, | |
) | |
if rate is None: | |
data = cmio_get_property( | |
object_id=stream, | |
address=address, | |
) | |
if data is not None and len(data) > 0: | |
print('***', data.hex()) | |
# rate = Float64.from_buffer_copy(data) | |
# return float(rate.value) | |
rate = struct.unpack('d', data)[0] | |
return float(rate) | |
return None | |
else: | |
status = cmio_set_property( | |
stream, | |
address, | |
Float64(rate) | |
) | |
return float(rate) if status else 0.0 | |
def cmio_get_streams(device_id: int) -> typing.List[int]: | |
streams = cmio_objects(cmio_get_property( | |
object_id=device_id, | |
address=CMIOObjectPropertyAddress( | |
selector=CoreMediaIO.kCMIODevicePropertyStreams, | |
), | |
)) | |
return list(streams) | |
if __name__ == '__main__': | |
uuid = avf_get_device_uuid('FaceTime HD Camera') | |
assert uuid is not None, uuid | |
print('uuid={}'.format(uuid)) | |
device = cmio_get_device_with_id(uuid) | |
assert device is not None, device | |
print('device={}'.format(device)) | |
streams = cmio_get_streams(device) | |
for stream in streams: | |
print('stream={}'.format(stream)) | |
print('frame rate={}'.format(cmio_stream_frame_rate(stream))) | |
print(cmio_stream_frame_rate(stream, 30)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment