rtc_plugins/test_time_r.py

66 lines
2.2 KiB
Python

import rtc_plugins
import time
import numpy as np
import mmap
import os
from ctypes import c_int16
srcUserId = "srcUser12"
destUserId = "destUser12"
srcDisplayName = "srcDisplayName12"
destDisplayName = "destDisplayName12"
srcRoomId = "srcRoom12"
#destRoomId = "destRoomId12"
destRoomId = srcRoomId
srcChannelIndex = 46
destChannelIndex = 47
def my_callback_r(shmName, dataSize, dataCount, sampleRate, numChannels, channelIndex):
print(f"my_callback_r, dataSize:{dataSize}, dataCount:{dataCount}, sampleRate:{sampleRate}, numChannels:{numChannels}, channelIndex:{channelIndex}")
print(f"data:{shmName}")
print("after my_callback_r")
#fd = os.open(shmName, os.O_RDONLY)
#if fd == -1:
# raise RuntimeError(f"无法打开共享内存 {shmName}")
## 2. 创建内存映射
#shm = mmap.mmap(fd, dataSize, mmap.MAP_SHARED, mmap.PROT_READ)
## 3. 转换为numpy数组 (零拷贝)
#audio_data = np.frombuffer(shm, dtype=c_int16, count=dataCount)
#print(f" 前5个采样点: {audio_data[:5]}")
#audioData = np.array([0, 1, -1, 0], dtype=np.int16)
#ret = rtc_plugins.sendCustomAudioData(srcChannelIndex, audioData, 48000, 1, len(audioData))
#if ret != 0:
# print(f"resend fail, ret:{ret}")
#else:
# print("resend succ")
ret = rtc_plugins.init(destUserId, destDisplayName, destRoomId, my_callback_r)
if ret != 0:
print(f"init fail, ret:{ret}")
exit(1)
ret = rtc_plugins.initRecv(destRoomId, srcUserId, destChannelIndex)
if ret != 0:
print(f"initRecv fail, ret:{ret}")
exit(1)
ret = rtc_plugins.initSend(destRoomId, srcRoomId, srcChannelIndex, 1)
if ret != 0:
print(f"initSend fail, ret:{ret}")
exit(1)
while True:
print("recv")
audioData = np.array([0, 1, -1, 0], dtype=np.int16)
ret = rtc_plugins.sendCustomAudioData(srcChannelIndex, audioData, 48000, 1, len(audioData))
if ret != 0:
print(f"resend fail, ret:{ret}")
else:
print("resend succ")
size = rtc_plugins.getSize()
print(f"data size:{size}")
#frame = rtc_plugins.getNumpyData()
#print(f"frame:{frame}")
dataCount = rtc_plugins.getDataCount()
print(f"data count:{dataCount}")
time.sleep(3)