Skip to content

Instantly share code, notes, and snippets.

@i-v-s
Created April 23, 2020 14:23
Show Gist options
  • Save i-v-s/a73d5eb380457620379fe25a00fcd865 to your computer and use it in GitHub Desktop.
Save i-v-s/a73d5eb380457620379fe25a00fcd865 to your computer and use it in GitHub Desktop.
Jetson hardware compression
def gst_src(size=-1, rate=30, number=0):
w, h = size
appsink = ' ! '.join([
f'video/x-raw(memory:NVMM), width={w}, height={h}, format=(string)NV12, framerate=(fraction){rate}/1',
'nvvidconv', f'video/x-raw, width=(int){w}, height=(int){h}, format=(string)BGRx',
'videoconvert ! appsink'
])
return ' ! '.join([
'nvarguscamerasrc',
#'tee name=t', # Source and tee
#'nvtee ! queue ! nvvidconv', # Branch #1
#'video/x-raw(memory:NVMM), width=(int)1280, height=(int)720, format=(string)NV12, framerate=(fraction)30/1',
#'nvv4l2h265enc control-rate=1 bitrate=8000000 ! video/x-h265, stream-format=(string)byte-stream',
# or 'h265parse ! splitmuxsink max-size-time=60000000000 location=nvm%03d.mp4 t.',
#'filesink location=rec%s.mp4 t.',
# 'queue ! videoconvert', # Branch #2
appsink
])
def send_h265_udp(sink_host=None, sink_port=None):
return ' ! '.join([
'autovideoconvert ! omxh265enc bitrate=2000000 ! video/x-h265, stream-format=(string)byte-stream',
f'h265parse ! rtph265pay ! udpsink host={sink_host} port={sink_port}'
])
def send_h264_udp(sink_host=None, sink_port=None):
return ' ! '.join([
'autovideoconvert ! omxh264enc bitrate=2000000 ! video/x-h264, stream-format=(string)byte-stream',
f'h264parse ! rtph264pay ! udpsink host={sink_host} port={sink_port}'
])
def write_h265(file_name, sink_host=None, sink_port=None):
pipeline = ['autovideoconvert ! omxh265enc control-rate=1 bitrate=2000000']
sinks = [f'video/x-h265, stream-format=(string)byte-stream ! h265parse ! qtmux ! filesink location={file_name} ']
if sink_host and sink_port:
sinks.append(f'video/x-h265, stream-format=(string)byte-stream ! h265parse ! rtph265pay ! udpsink host={sink_host} port={sink_port}')
if len(sinks) > 1:
pipeline.append('tee name=t ! queue')
pipeline.append(' t. ! '.join(sinks))
return ' ! '.join(pipeline)
def gst_dst(file_name, sink_host=None, sink_port=None):
pipeline = []
pipeline.append('appsrc')
sinks = [write_h265(file_name, sink_host, sink_port)]
#if sink_host and sink_port:
# sinks.append(send_h265_udp(sink_host, sink_port))
if len(sinks) > 1:
pipeline.append('tee name=t')
pipeline.append(' t. ! queue ! nvtee ! '.join(sinks))
return ' ! '.join(pipeline)
def gst_dst1(file_name, sink_host=None, sink_port=None):
pipeline = []
pipeline.append('appsrc ! autovideoconvert ! omxh265enc control-rate=1 bitrate=2000000')
if sink_host and sink_port:
pipeline.append('tee name=t ! queue ! video/x-h265, stream-format=byte-stream ! rtph265pay mtu=1400')
pipeline.append(f'udpsink host={sink_host} port={sink_port} sync=false async=false t. ! queue')
# pipeline.append(f'rtspsink mapping="/stream" service={sink_port} t. ! queue')
pipeline.append(f'video/x-h265, stream-format=(string)byte-stream ! h265parse ! qtmux ! filesink location={file_name} ')
return ' ! '.join(pipeline)
import cv2
import json
from os import mkdir
from os.path import join, isdir
from datetime import datetime
from logging import getLogger, Formatter, StreamHandler, INFO
from logging.handlers import TimedRotatingFileHandler
from gst import gst_src, gst_dst
def init_logger(name, log_dir='log'):
logger = getLogger(name)
formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if not isdir(log_dir):
mkdir(log_dir)
fh = TimedRotatingFileHandler(join(log_dir, 'server.log'), when='midnight', interval=1)
fh.suffix = "%Y%m%d"
fh.setLevel(INFO)
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = StreamHandler()
ch.setLevel(INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(INFO)
return logger
def load_conf(file_name='conf.json'):
sizes = [(3264, 2464), (3264, 1848), (1920, 1080), (1280, 720)]
with open(file_name) as file:
conf = json.load(file)
size = conf.get('size', -1)
size = sizes[size] if type(size) is int else tuple(size)
return conf.get('source', 0), size, conf.get('rate', 30), conf.get('destination', {})
def main():
logger = init_logger('stream')
source, size, rate, destination = load_conf()
dst_dir = destination.get('dir', 'records')
if not isdir(dst_dir):
mkdir(dst_dir)
if type(source) is dict and 'gst' in source:
source = gst_src(size, rate, **source['gst'])
logger.info(f'Size: ({size[0]}, {size[1]}), rate: {rate}')
logger.info('Source is %s', source)
cap = cv2.VideoCapture(source)
last_time = None
writer = None
while True:
time = datetime.now().strftime('%y-%m-%d_%H-%M')
if last_time != time:
if writer:
writer.release()
fn = join(dst_dir, f'rec{time}.mp4')
logger.info('Write to %s', fn)
last_time = time
dst = gst_dst(fn, **destination['gst']) if 'gst' in destination else fn
writer = cv2.VideoWriter(dst, 0, float(rate), size, True)
ok, frame = cap.read()
if not ok:
break
writer.write(frame)
logger.debug(''.join(map(str, [frame.shape, frame.min(), frame.max()])))
# cv2.imwrite('data.png', frame)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment