徐琦的博客

  • 首页

  • 归档

基于WebRTC的音视频对讲

发表于 2019-03-21 | 分类于 Demo

WebRTC是什么这里不做过多介绍, 可以参考WebRTC官网,以及它的Step by Step教程, 本文描述如何基于该项技术做一个简单的音视频对讲。

环境准备

  • 浏览器:较新版本的Chrome
  • 支持HTTPS的网页服务器,建议使用Nginx反向代理

通讯机制

浏览器使用一个简单的服务端key-value接口来交互数据

key-value实现

使用springboot实现一个简单的kv存储即可

@RestController
@RequestMapping(value = "/kv")
public class KeyValueController {
ConcurrentHashMap<String,String> map = new ConcurrentHashMap<>();

@GetMapping(value = "/{key}")
String get(@PathVariable String key){
return map.get(key);
}
@RequestMapping(value = "/{key}", method = RequestMethod.POST)
public String put(@PathVariable String key,
@RequestParam(value = "value") String value) throws UnsupportedEncodingException {
System.out.println(key + ":" + URLDecoder.decode(value,"utf-8"));
map.put(key,value);
return value;
}

}

浏览器调用key-value接口

浏览器使用js来访问kv存储, 除了读写数据外,还加入了一个wait函数,用于等待特定数据就位

//为key添加一个前缀, 可以支持每个房间都有独立的key-value空间
var prefix = config.room;

//设置数据
function set(key, value, result) {
var str = encodeURI(JSON.stringify(value));
$.post("/kv/" + prefix + "_" + key, { value: str }, result);
}

//读取数据
function get(key, result) {
$.get(
"/kv/" + prefix + "_" + key,
null,
r1 => {
result(JSON.parse(decodeURI(r1)));
},
"text"
);
};

//等待指定数据就位
function wait(key,value,callback){
var timer = setInterval(()=>{
get(key,(v)=>{
if(v == value){
clearInterval(timer);
callback();
}
})
},1000);
}

工作流程

为了避免不必要的复杂度, 这里将通讯双方区分为主机和客机, 主机访问host.html,客机访问guest.html

主机工作流程

  • 获取本地音视频,在本地显示
  • 创建RTCPeerConnection,并监听onicecandidate和onaddstream
  • 将本地流加入到RTCPeerConnection中,并createOffer
  • createOffer成功后,设置RTCPeerConnection的本地描述符,并将本地描述符提交给kv存储
  • onicecandidate 监听到candidates列表后,将该信息提交给kv存储,并报告kv存储主机网络已经就绪,并开始等待客户机网络就绪
  • 当确认客户机网络就绪后,从kv存储中去除客户机的描述符和andidates列表,并设置在RTCPeerConnection上
  • onaddstream 监听到事件后,表示客户机的流已经到位, 将客户机的流显示在本地

客机工作流程

  • 获取本地音视频,在本地显示
  • 等待主机的网络就位
  • 当主机网络就位后,创建RTCPeerConnection,并监听onicecandidate和onaddstream
  • 将本地流加入到RTCPeerConnection中
  • 从kv存储中获取主机的candidates和流描述符,设置到RTCPeerConnection中
  • 调用RTCPeerConnection的createAnswer
  • createAnswer成功后,设置RTCPeerConnection的本地描述符,并将本地描述符提交给kv存储
  • onicecandidate监听到candidates列表后,将该信息提交给kv存储,并报告kv存储客机网络已经就绪
  • onaddstream 监听到事件后,表示主机的流已经到位, 将主机的流显示在本地

Turn服务

如果主机和客机处于同一个局域网,那么不需要额外的服务即可通讯, 否则的话,就有可能需要服务器来中转

可以使用Coturn TURN server的docker镜像boldt/coturn来快速搭建turn服务

下面是一段启动脚本

#!/bin/bash
export MIN_PORT=50000
export MAX_PORT=51000
sudo docker run \
-d \
-p 3478:3478 \
-p 3478:3478/udp \
-p ${MIN_PORT}-${MAX_PORT}:${MIN_PORT}-${MAX_PORT}/udp \
-e USERNAME=username \
-e PASSWORD=password \
-e MIN_PORT=${MIN_PORT} \
-e MAX_PORT=${MAX_PORT} \
--restart=always \
--name coturn \
--volume /etc/pki/nginx/cert.pem:/etc/ssl/turn_server_cert.pem \
--volume /etc/pki/nginx/key.pem:/etc/ssl/turn_server_pkey.pem \
coturn

示例代码

https://github.com/xuqifzz/webrtcDemo

使用PhantomReference以及GC日志分析GC如何工作

发表于 2019-01-29 | 分类于 Java

基本思路

写一段程序, 每隔很短的间隔创建一个大对象, 并按照一定的规则hold住或者丢弃这个对象, 直到抛出OutOfMemoryError, 在此过程中观察GC的工作日志。

参数设定

  • 从1开始递增计数, 每隔30毫秒创建一个大小为1M的bigObject对象,
  • 如果当前计数4的倍数, 则hold住这个对象,将其放入到一个hold队列中, 不让GC将其释放,否则直接丢弃该对象
  • 如果当前计数为5的倍数, 则从hold队列中丢弃一个对象
  • 通过以上规则, hold队列中对象释放的速度没有增加的速度快, 因此OutOfMemoryError是必然结局

观察方法

Java生成的GC日志只会记录GC发生前后新生代和老年代的空间变化情况, 而我想进一步知道每次GC都释放了多少个bigObjec对象, 所以这里会采用PhantomReference来跟踪bigObjec的释放情况。

主要代码

完整代码在这里

long i = 1;
try {
for (; ; i++) {
byte[] bigObject = new byte[Unit];//本程序的GC基本上都是由本行代码触发
register(bigObject); //使用PhantomReference跟踪bigObject的释放情况,并调用checkGC()
if (i % 4 == 0) holder.add(bigObject);
if (i % 5 == 0) holder.poll();
Thread.sleep(30);
}

} catch (`OutOfMemoryError` error) {
System.out.println("`OutOfMemoryError`: 一共创建bigObject:" + (i-1) + "个,GC次数:" + gcCount);
System.out.println("总共成功回收bigObject:" + gcObjectCount + ",堆中还剩下:" + phantomSet.size());
}

检测一次GC回收了多少个对象代码

static void register(byte[] bigObject){
System.out.print("."); //打一个点表示成功创建一个bigObject
phantomSet.add(new PhantomReference<>(bigObject,phantomReferenceQueue));
checkGC();
System.out.print("."); //打一个点表示成功创建一个bigObject
}

static void checkGC(){
int count = 0;
Reference<?> reference = null;
while ((reference = phantomReferenceQueue.poll()) != null) {
count++;
phantomSet.remove(reference);
}

if(count > 0){
System.out.println("下次GC将回收bigObject:" + count );
gcCount++;
gcObjectCount += count;

}

}
阅读全文 »

从零开始实现Netty极简版 (三) 实现ChannelPipeline

发表于 2019-01-16 | 更新于 2019-01-25 | 分类于 Netty

在从零开始实现Netty极简版 (二)中, 我们已经实现了异步接口, 接下来的目标便是实现ChannelPipeline,也就是Netty中的责任链。

ChannelPipeline是Netty非常重要的特性, 因为要学习使用Netty, 那就必须要了解ChannelPipeline, 在Netty中, 网络通讯几乎所有的关键操作都跟ChannelPipeline有关系, 包括bind, accept, read, write等等。

由于之前没有实现ChannelPipeline,所以这些相关操作都不是在ChannelPipeline中完成的, 而在本文中, 这些操作都将被被移入它们应该在的位置。
源码地址

启动代码

ChannelInboundHandler handler = (ctx,msg) -> {
byte[] bytes = (byte[])msg;
String str = new String(bytes);
System.out.println("接收到消息:" + str);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ctx.write(buffer).addListener((future) -> {
System.out.println("回复成功");
});
};
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(handler);
//这里改成异步调用
b.bind(6666).addListener((future)->{
System.out.println("绑定端口成功");
});

可以看到, 这次不仅支持了数据读取, 还支持了数据写入, 数据写入也是异步操作, 操作完成后将输出”回复成功”。

阅读全文 »

从零开始实现Netty极简版 (二) ChanelFuture和ChannelPromise

发表于 2019-01-15 | 更新于 2019-01-25 | 分类于 Netty

在从零开始实现Netty极简版 (一)中, 我们实现一个基本骨架, 接下来的目标便是实现异步操作。

源码地址

因此服务的启动代码需要做相应升级

//升级前, bind是同步操作
b.bind(6666);

//升级后,bind成为了异步操作,可以添加listener
b.bind(6666).addListener((future)->{
System.out.println("绑定端口成功");
});

阅读全文 »

从零开始实现Netty极简版 (一) 基本骨架

发表于 2019-01-14 | 更新于 2019-01-25 | 分类于 Netty

第一步的任务是实现一个基本骨架, 让程序能够跑起来。

程序能够监听一个指定的端口接收客户端端连接,并在控制台打印出客户端发送过来的字符串。

源码地址

启动代码

服务端启动代码如下:(是不是跟netty的启动代码很相似?)

ChannelHandler handler = (o) -> {
byte[] bytes = (byte[])o;
String msg = new String(bytes);
System.out.println("接收到消息:" + msg);
};
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(handler);
b.bind(6666);

以上的测试监听了6666端口,并将接收到的文本消息打印到控制台

阅读全文 »

Netty中的泛型

发表于 2019-01-13 | 更新于 2019-01-25 | 分类于 Netty

Netty中的泛型用的不是很多,少数几个泛型的目的是为了向用户提供更加友好的接口, 比如启动器类和异步操作类, 而Netty内部的泛型使用很少。

启动器类

Netty只提供了Bootstrap以及ServerBootstrap两个启动器类, 其基类是一个泛型类, 定义如下

public abstract class AbstractBootstrap<
B extends AbstractBootstrap<B, C>,
C extends Channel
> implements Cloneable

乍一看去去这个类的泛型参数定义挺复杂的, 实际上这个类只有一个泛型参数, 那就是C extends Channel, 这个参数表示接收一个Channel的子类作为类型参数.

所以AbstractBootstrap的定义本该是像下面这样简单的

class AbstractBootstrap<C extends Channel>

所以多出来的B extends AbstractBootstrap<B, C>是什么鬼?

实际上这是为了方便最终用户以链式调用的方式使用启动器类的一个实现技巧, B extends AbstractBootstrap<B, C>中的B就是指用户实际使用的启动器类, 这个启动器类是AbstractBootstrap的子类.

举个栗子, 如果不添加这个参数, 以class AbstractBootstrap<C extends Channel>的方式来定义AbstractBootstrap, 那么它的group方法的前面只能是:

public AbstractBootstrap<C> group(EventLoopGroup group)

而用户写下的如下代码便无法编译

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup).childHandler( ...)

无法编译的原因是AbstractBootstrap并未定义childHandler方法, 是这是子类ServerBootstrap自己定义的方法.而gropu方法返回的是AbstractBootstrap, 自然无法调用子类定义的方法.

而如果在AbstractBootstrap中加上的表示子类自身的泛型参数B extends AbstractBootstrap<B, C>, 那么group方法便可定义成

public B group(EventLoopGroup group)

因为返回值是B, 也就是子类本身, 在上面的例子中返回的是ServerBootstrap, 这样一来, 链式调用childHandler就没有问题了

异步操作类

在Promise的addListener的参数乍一看令人震惊, 下面是addListener的签名以及GenericFutureListener的定义:

阅读全文 »

整理Blog内容到GitHub Pages

发表于 2019-01-13 | 分类于 其他

因为GitHub Pages已经是程序员博客的大势所趋,再加上以前写的东西比较混乱,决定痛下决心进行整理,希望能有个好的开始。

这个博客基于Hexo创建,由于我还不是很熟,所以放一份快速操作指南在此处方便查询

  • documentation
  • troubleshooting
  • Hexo GitHub Issue

Create a new post

$ hexo new "一篇金光闪闪的文章"
$ hexo new draft "一份蓄势待发的草稿"
$ hexo publish "一份蓄势待发的草稿"

More info: Writing

Run server

$ hexo server

More info: Server

Generate static files

$ hexo generate

More info: Generating

Deploy to remote sites

$ hexo deploy

More info: Deployment

Windows下的端口转发

发表于 2018-12-12 | 更新于 2019-03-21 | 分类于 系统运维

使用powershell中的netsh命令

端口转发是网络运维中常用到的功能, Windows已经自带了端口转发功能, 可以通过在PowerShell中调用netsh interface portproxy实现, 一般格式如下:

netsh interface portproxy add v4tov4 listenaddress=$Lhost listenport=$Lport connectaddress=$Rhost connectport=$Rport

参数说明可以通过调用netsh interface portproxy add v4tov4 ?进行查看, 比如要监听本地的8554端口, 将其转发到192.168.31.39:554上(进行RTSP流转发).可以调用如下命令:

netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=8554 connectaddress=192.168.31.39 connectport=554

另外, netsh interface portproxy还支持一些管理命令, 这些命令可以通过netsh interface portproxy ?进行查看, 下面是一份清单:

命令 备注
add 在一个表格中添加一个配置项。
delete 从一个表格中删除一个配置项。
dump 显示一个配置脚本。
help 显示命令列表。
reset 重置端口代理配置状态。
set 设置配置信息。
show 显示信息。

使用Win-PortFwd管理端口转发

尽管直接使用PowerShell中的命令不是很复杂, 但仍有人觉得不方便, 这里推荐github上的一个管理脚本Win-PortFwd, 它提供了比较友好的菜单和向导, 可以更加简单的对转发的端口进行管理

权限问题

在添加端口转发的时候有时会出现权限问题, 通常运行下面的命令就可以解决:

set-executionpolicy remotesigned

没有监听端口的问题

在添加端口后, 也没有报错, 但端口转发就是没有工作, 使用netstat查看监听中的端口, 发现端口根本没有在监听, 这很有可能是因为系统服务列的IP Helper并未启动, 启动该服务即可正常工作。

使用字面值初始化Map

发表于 2017-01-26 | 更新于 2019-01-29 | 分类于 Java

在Java中使用字面值初始化一个Map很困难, 正如Java的一贯作风, 代码比较恶心, 比如

myMap = new HashMap<String, Object>();
myMap.put("id", 5);
myMap.put("name", "xuqi");
myMap.put("age", 18);

这令强迫症实在难以忍受, 所以我决定研究一下看看有什么”优雅”的解决方案

其他语言的做法

  • Javascript

    var myMap={id:5, name: "xuqi", age: 18};
  • Python

    myMap = {'id': 5, name: 'xuqi', 'age': 18}
  • C#

    var myMap = new Dictionary<string, Object> {{"id", 5}, {"name", "xuqi"}, {"age", 18}};
  • C++ (就连C++ 都…)

    std::map<int, string> int_to_string = {{1, "java"}, {2, "is"}, {3, "pretty"}};

Java中的一些解决办法

双括号初始化法

//这个方法多了一些多余的put很令人不爽
Map<String , Object> map = new HashMap<String , Object>(){{
put("id", 5);
put("name", "xuqi");
put("age", 18);
}};

二维数组封装法

定义如下函数:

static Map<String,String> createStringMap(String[][] entries){
return Stream.of(entries).collect(Collectors.toMap(data -> data[0], data -> data[1]));
}

static Map<String,Integer> createIntegerMap(Object[][] entries){
return Stream.of(entries).collect(Collectors.toMap(data -> (String) data[0], data -> (Integer) data[1]));
}

则可以用如下方式创建Map

//创建值为String的Map
Map<String,String> stringMap =
createStringMap(new String[][] {{ "key1", "value1" },{ "key2", "value2" },{ "key3", "value3" }});

//创建值为Integer的Map
Map<String,Integer> intMap =
createIntegerMap(new Object[][] {{ "key1", 1 },{ "key2", 2 },{ "key3", 3 }});

遗憾的是, 因为Java强大的类型擦除特性, 通过这个思路暂时无法写出下面的方法, 除非再加个恶心的Class<T>参数

//该方法因为类型擦除无法实现
static <T> Map<String,T> createMap(Object[][] entries)

另外这个方式也没有编译时检查功能, 不太安全。

参数列表封装法

有很多类库都是这个办法,比如Java 9的

Map<String, String> unmodifiableMap = Map.of("key1", "value1", "key2", "value2");

自己实现的话,大概可以这样

public static <A> Map<String, A> asMap(Object... keysAndValues) {
return new LinkedHashMap<String, A>() {{
for (int i = 0; i < keysAndValues.length - 1; i++) {
put(keysAndValues[i].toString(), (A) keysAndValues[++i]);
}
}};
}

可以这样调用

Map<String, String> one = asMap("1stKey", "1stVal", "2ndKey", "2ndVal");
Map<String, Object> two = asMap("1stKey", Boolean.TRUE, "2ndKey", new Integer(2));

上面的asMap没有编译期类型安全检查, 而Java9的版本似乎是安全的,但好像参数数量有限制,而且限制的数量还很少(这点我没去确认)

另外这个方式语义也不太好, Map应该是一对对的,不是吗?

目前最优雅的办法-lambda

查了N多资料后,我终于找到了一个优雅的办法, 那就是使用lambda表达式, 客户代码很简单, 而且类型安全

//混合型Map
Map<String,Object> map = hashMap(id -> 5,name -> "xuqi",age -> 18);

//String
Map<String,String> stringMap = hashMap(id -> "5",name -> "xuqi",age -> "18");

//Integer
Map<String,Integer> intMap = hashMap(id -> 5,name -> 6666,age -> 18);

实现方法至少需要Java 8u60, 而且在编译时需要加上 -parameters 参数, 在gradle可以加上下面的代码

tasks.withType(JavaCompile) {
configure(options) {
options.compilerArgs << '-parameters' << '-Xlint:unchecked'
}
}

这个实现的关键是SerializedLambda

实现代码

下面是代码中最主要的部分, 获取lambda表达式的参数名

Method replaceMethod = getClass().getDeclaredMethod("writeReplace");
replaceMethod.setAccessible(true);
SerializedLambda lambda= (SerializedLambda) replaceMethod.invoke(this);
Class<?> containingClass = Class.forName(lambda.getImplClass().replaceAll("/","."));
Method method = asList(containingClass.getDeclaredMethods())
.stream()
.filter(method0 -> Objects.equals(method0.getName(), lambda.getImplMethodName()))
.findFirst()
.orElseThrow(UnableToGuessMethodException::new);
return method.getParameters()[0].getName();

参考资料

  • https://www.baeldung.com/java-initialize-hashmap
  • https://benjiweber.co.uk/blog/2015/08/17/lambda-parameter-names-with-reflection/
  • https://gist.github.com/galdosd/10823529

检查一个对象是否被GC的方法

发表于 2016-10-16 | 更新于 2019-01-29 | 分类于 Java

在Java中,可以使用PhantomReference来检查一个对象是否被GC,示例代码如下:

public void trackGC() throws InterruptedException {
Object o = new Object();
ReferenceQueue<Object> queue = new ReferenceQueue<>();
PhantomReference<Object> phantomReference = new PhantomReference<>(o, queue);
assertNull(queue.poll());
o = null;
System.gc();
Thread.sleep(1000);
assertNotNull(queue.poll());
}

这里有一点要注意, phantomReference不能在gc之前就赋值为null, 不然会收不到通知。
比如把

PhantomReference<Object> phantomReference = new PhantomReference<>(o, queue);

替换为:

new PhantomReference<>(o, queue);

或

PhantomReference<Object> phantomReference = new PhantomReference<>(o, queue);
phantomReference = null;

都会导致测试失败

12
徐琦

徐琦

专注于应用程序框架开发

16 日志
9 分类
11 标签
GitHub E-Mail
快速访问
  • 知乎
  • bilibili
  • 谷歌翻译
  • 网易公开课
  • stackoverflow
  • 中国大学MOOC
© 2008 – 2019 徐琦
由 Hexo 强力驱动 v3.8.0
|
主题 – NexT.Pisces v6.7.0
Hosted by GitHub Pages