import random import re import json import urllib3 from streamlink.plugin import Plugin, pluginmatcher from streamlink.plugin.api import validate from streamlink.stream import HLSStream urllib3.disable_warnings() _url_re = re.compile(r"http(s)?://(www\.)?camsoda\.com/(?P<username>[^\"\']+)") _api_video_schema = validate.Schema( { "token": str, "edge_servers": [str], "stream_name": str } ) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0", "Referer": "https://www.camsoda.com", "Host": "www.camsoda.com" } @pluginmatcher(_url_re) class Camsoda(Plugin): API_URL_VIDEO = "https://ww2.camsoda.com/api/v1/video/vtoken/{0}?username=guest_{1}" HLS_URL_VIDEO_EDGE = "https://{server}/{stream_name}_v1/index.m3u8?token={token}" HLS_URL_VIDEO = "https://{server}/mp4:{stream_name}_aac/playlist.m3u8?token={token}" def _get_api_video(self, username): url = self.API_URL_VIDEO.format(username, str(random.randint(1000, 99999))) response = self.session.http.get(url, headers=headers, verify=False, timeout=10) if response.status_code != 200: self.logger.error(f"Failed to get video API data, status code: {response.status_code}") return None try: data_video = response.json() if "status" in data_video and data_video["status"] == 0: self.logger.info(f"Nickname '{username}' is invalid") return None if "edge_servers" in data_video and not data_video["edge_servers"]: if "stream_name" in data_video: if not data_video["stream_name"]: self.logger.info(f"OFFLINE") else: self.logger.info(f"*****PRIVATE*****") return None validate.validate(_api_video_schema, data_video) except Exception as e: self.logger.error(f"Error parsing API response: {e}") return None return data_video def _get_streams(self): match = _url_re.match(self.url) username = match.group("username").replace("/", "") data_video = self._get_api_video(username) if not data_video: return hls_url = self.HLS_URL_VIDEO.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) if "edge" in data_video["edge_servers"][0]: self.session.http.verify = False hls_url = self.HLS_URL_VIDEO_EDGE.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) for s in HLSStream.parse_variant_playlist(self.session, hls_url).items(): yield s __plugin__ = Camsoda
import random import re import json import urllib3 from streamlink.plugin import Plugin, pluginmatcher from streamlink.plugin.api import validate from streamlink.stream import HLSStream urllib3.disable_warnings() _url_re = re.compile(r"http(s)?://(www\.)?camsoda\.com/(?P<username>[^\"\']+)") _api_video_schema = validate.Schema( { "token": str, "edge_servers": [str], "stream_name": str } ) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0", "Referer": "https://www.camsoda.com", "Host": "www.camsoda.com" } proxies = { "http": "http://127.0.0.1:2080", "https": "http://127.0.0.1:2080" } @pluginmatcher(_url_re) class Camsoda(Plugin): API_URL_VIDEO = "https://ww2.camsoda.com/api/v1/video/vtoken/{0}?username=guest_{1}" HLS_URL_VIDEO_EDGE = "https://{server}/{stream_name}_v1/index.m3u8?token={token}" HLS_URL_VIDEO = "https://{server}/mp4:{stream_name}_aac/playlist.m3u8?token={token}" def _get_api_video(self, username): url = self.API_URL_VIDEO.format(username, str(random.randint(1000, 99999))) response = self.session.http.get(url, headers=headers, proxies=proxies, verify=False, timeout=10) if response.status_code != 200: self.logger.error(f"Failed to get video API data, status code: {response.status_code}") return None try: data_video = response.json() if "status" in data_video and data_video["status"] == 0: self.logger.info(f"Nickname '{username}' is invalid") return None if "edge_servers" in data_video and not data_video["edge_servers"]: if "stream_name" in data_video: if not data_video["stream_name"]: self.logger.info(f"OFFLINE") else: self.logger.info(f"*****PRIVATE*****") return None validate.validate(_api_video_schema, data_video) except Exception as e: self.logger.error(f"Error parsing API response: {e}") return None return data_video def _get_streams(self): match = _url_re.match(self.url) username = match.group("username").replace("/", "") data_video = self._get_api_video(username) if not data_video: return hls_url = self.HLS_URL_VIDEO.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) if "edge" in data_video["edge_servers"][0]: self.session.http.verify = False hls_url = self.HLS_URL_VIDEO_EDGE.format( server=data_video["edge_servers"][0], stream_name=data_video["stream_name"], token=data_video["token"] ) for s in HLSStream.parse_variant_playlist(self.session, hls_url).items(): yield s __plugin__ = Camsoda
#!/bin/bash if [[ -z "$1" ]]; then echo "No options found!" exit 1 fi CAPDIR=$(pwd)/ DATE=$(date '+%Y.%m.%d_%H%M') DATEDIR=$(date '+%Y%m%d') WORKDATEDIR=cap_$DATEDIR NICKNAME=$1 echo "root cap dir: $CAPDIR" cd $CAPDIR && echo "current dir: "$(pwd) if [[ ! -d "$WORKDATEDIR" ]]; then echo "$WORKDATEDIR does not exist. creating..." mkdir $WORKDATEDIR #exit 0 fi cd $WORKDATEDIR echo "current dir: "$(pwd) # Функция для запуска и проверки streamlink run_streamlink() { #echo test-$NICKNAME DATE=$(date '+%Y.%m.%d_%H%M') DATEDIR=$(date '+%Y%m%d') streamlink https://rt.bongacams44.com/$NICKNAME --stream-segment-threads 3 \ --default-stream best --retry-streams 3 -O | \ ffmpeg -hide_banner -err_detect ignore_err -re -i pipe:0 -c:v copy -c:a copy -strict -2 -f mp4 ./$NICKNAME\_$DATE.mp4 # Если процесс streamlink завершился непредвиденно (например, видеопоток прервался), # то будет выполнена перезапуск streamlink через 5 секунд sleep 5 run_streamlink } # Запуск функции run_streamlink в фоновом режиме run_streamlink & pid=$! # Ожидание сигнала завершения работы скрипта trap "kill $pid" INT TERM wait $pid exit 0
#!/bin/bash find -type f -name "*.mp4" -print0 | while read -d $'\0' a; do resol=$(ffprobe -v error -select_streams v:0 -show_entries stream=width,height,bit_rate -of csv=s=x:p=0 ${a}) # echo $resol width=$(echo $resol | awk -F "x" '{print $1}') # echo width=$width height=$(echo $resol | awk -F "x" '{print $2}') # echo height=$height bitrate=$(echo $resol | awk -F "x" '{print $3}') echo "resolution="$width"x"$height" bitrate="$bitrate" file="$a if [[ $width == "960" ]] && [[ $height == "1280" ]]; then echo worked echo " " if (( $bitrate >= "1000000" )); then echo " run ffmpeg" < /dev/null ffmpeg -err_detect ignore_err -hwaccel cuda -i "$a" -c:v h264_nvenc -b:v 1M -c:a copy -hide_banner "${a[@]/%.mp4/_avc.mp4}" fi done exit 0