ClipOne多平台剪切板同步工具

go实现 基于消息队列/syncthing公共中继服务器

为了在对等设备之间传输数据,实现2个方案

  • 消息队列
  • 实现syncthing的传输协议,使用公共的中继服务器

消息传递

使用消息队列

使用了RabbitMQ,官网在此

何为消息队列(Message Queue)?

队列(FIFO),是一个先进先出的数据结构,go的channel、linux的管道都是类似的思想。将其运用到中间件中,让消息在一个队列中传输和分发,就是消息队列。

作用:

  • 组件之间的解耦

    • 以剪切板同步为例,我们当然可以直接将消息通过tcp/http之类的协议直接发送给对等的同步设备,假设有5个同步设备,我们只需注册这5个设备,并通过一个循环发送。但是这样2个对等点间是强耦合的,如果一个设备掉线改如何处理内,重发吗?,如果同时需要保存一个剪切板日志到自己的nas设备呢?
    • 使用消息队列可以解耦组件,组件间可以通过消息队列插拔。设备连接、断开、数据重发都是与业务逻辑无关的需求,完全由中间件代劳来简化设计
  • 流量削峰

    • 瞬间涌入很多请求,会在队列进行排队,而不会一次全部执行,均摊了压力(不可避免地增加了时延)
  • 异步处理

    • 不需要马上处理并返回结果,而可以将请求提交到消息队列,前端接口可以直接返回结果

关键词:

  • VirtualHost(虚拟消息服务器):隔离

  • Connections:实际的网络连接

  • channel信道,同一应用中复用tcp连接隔离

  • exchanges:交换机,根据预定的规则将消息发送到消息队列

  • Queues:消息队列

  • Binding:将消息队列绑定到交换机,有多种绑定规则

  • AMQP(Advanced Message Queuing Protocol)高级消息队列协议

  • RoutingKey(路由键):用于把生成者的数据分配到交换器上;

    BindingKey(绑定键):用于把交换器的消息绑定到队列上;

exchanges

RabbitMQ消息队列模型如上,生产者(P)将消息发送到交换机(X),交换机根据一些预定义的规则,将消息转发到queue(红色)

和队列绑定的应用可以获取队列中的消息

根据不同的交换机规则,可以实现很多效果,如:

  • 工作队列,有数个消费者绑定了一个队列,先到先得处理数据(消费者间是竞争关系)
  • 发布/订阅,一次向多个消费者发送消息(我们使用的)
  • 路由,根据路由键和绑定键的匹配来发送数据
  • 主题/订阅,根据主题的匹配来发送数据,比路由更灵活

详细代码和解释可以看官方教程

func (m *MsgManager) Receive(ctx context.Context) error {
    // 定义队列
	q, err := m.ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	// 绑定交换机和队列
	err = m.ch.QueueBind(
		q.Name,         // queue name
		"",             // routing key
		m.exchangeName, // exchange
		false,
		nil,
	)
	// 获得一个消费channel,数据会直接传入channel中
	msgCh, err := m.ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	// 启动一个协程,消费消息
	go func() {
		for {
			select {
			case d := <-msgCh:
				m.rwLock.RLock()
                // 扇出模式会导致自己也收到消息,所以缓存了上一个消息,来去重
				if m.backData != nil && bytes.Equal(m.backData, d.Body) {
					m.rwLock.RUnlock()
					continue
				}
				m.rwLock.RUnlock()
				m.ReceiveCh <- d.Body
			case <-ctx.Done():
				log.Println(" [receive]: ctx done!")
				return
			}
		}
	}()
	return nil
}

程序主体的逻辑

  • 从剪切板的channel中获取数据并发送
  • 从mq的channel中获取数据,解析并写入到剪切板

另外单独处理了重连和错误的情况

	go func() {
		for {
			select {
			case <-reconnectCh:
				msgManager = mq.NewMsgManager(util.User, util.Url)
				cancel = nil
				err := msgManager.Init()
				if err != nil {
					log.Println("init msg fail")
					msgManager = nil
					errCh <- err
					continue
				}

				ctx, cancel = context.WithCancel(context.Background())
				err = msgManager.Receive(ctx)
				if err != nil {
					log.Println("init msg receive err")
					errCh <- err
					continue
				}

				log.Println("rabbitMQZ init success")
			Deal:
				for {
					select {
					case cell := <-clipboardM.CellChan:
						cell = filter_.Execute(cell)
						if cell == nil {
							log.Println("Filter cell")
							continue
						}
						data, err := clipboard.Encode(cell)
						if err != nil {
							log.Println("encode fail: ", err)
							continue
						}
						err = msgManager.Send(data)
						if err != nil {
							log.Println("send fail: ", err)
							continue
						}

						log.Printf("send length: %d", len(data))
					case data := <-msgManager.ReceiveCh:
						if len(data) == 0 {
							log.Println("empty payload, reconnect")
							errCh <- fmt.Errorf("empty payload")
							break Deal
						}
						c, err := clipboard.Decode(data)
						if err != nil {
							log.Println("decode fail: ", err)
							continue
						}
						log.Printf(" receive length: %d", len(data))
						err = clipboardM.Write(c)
						if err != nil {
							log.Println("write fail: ", err)
							continue
						}
					}
				}
			case err := <-errCh:
				log.Println("err: ", err)
				if cancel != nil {
					cancel()
				}
				if msgManager != nil {
					msgManager.Close()
				}
				clipboardM.Clean()
				reconnectCh <- struct{}{}
				<-time.After(time.Second * 3)
			}
		}
	}()

syncthing中继服务器

Syncthing 是一个持续的文件同步程序。它在两台或多台计算机之间实时同步文件,安全地防止窥探。您的数据只是您的数据,您应该选择存储位置、是否与第三方共享以及如何通过 Internet 传输。

Syncthing 的所有源代码都是开源的,包括传输和中继协议都是完全开源的。它很流行所以有很多公开的免费中继节点可供使用。

image-20220408184925009

可以在此查看公开中继节点的状态

因此,我们可以通过syncthing的协议,连接中继节点,并以它的逻辑来识别和同步设备。又因为它使用了端到端的TLS加密,使得剪切板数据在互联网访问传输的安全性得到保证。

设备id

设备ID文档

设备id是syncthing设备认证的基础,也是传输安全的最大保证。

在首次启动syncthing时,应用会自动生成一对ECDSA的密钥对。id,由证书的sha256摘要后的256bit base32编码去除尾部=,并添加校验位后获得

设备认证

在使用中继的情况下,中继服务器和设备会交换tls公钥,双方都从公钥中计算出id,如果id与计算出来的id是不一致的,那表明这是个非法用户(实际上无需交换id,服务器可以从证书中推导id)

而要找到有相同id但却不同的两个证书,可能性比小行星撞击地球的概率还低得多。这也就保证了设备id无法被伪造,基于此2个设备可以通过用户来交换id(由用户配置需要连接的设备id),并连接到同一个中继交换数据

设备发现

在使用中继时,每个设备都会将自身的id和端口向全局发现服务器通告。发起连接的设备可以向全局发现服务器查询对方的位置,处于哪个中继服务器,随后连接到对应的中继服务器,完成通信通道的链接

syncthing协议分析

syncthing一共有4个协议,协议的介绍在此

  • 块交互协议(Block Exchange Protocol v1)
  • 全局发现(Global Discovery)
  • 中继协议(Relay Protocol)
  • 局域网发现协议(Local Discovery Protocol)

我们只关注 全局发现、中继协议

中继协议(Relay Protocol)

一般来说,如果两个设备处于不同的局域网,并不能直接传输数据,需要借助公网主机中继,比如使用内网穿透(比如frp)。

实现这个协议的目的,就是使用公开的中继服务器,而不要求一定要有公网主机。

A relay was designed to relay BEP protocol, hence the reliance on device ID’s in the protocol spec, but at the same time it is general enough that could be reused by other protocols or applications, as the data transferred between two devices which use a relay is completely obscure and does not affect the relaying. —syncthing doc

中继协议设计地足够通用,可以被其他应用/协议复用

中继协议实现了2种操作模式

  • 协议模式。客户端与中继直接交换信息,中继由此验证和确定设备id。有2个子模式

    • 永久协议子模式。加入中继等待其他设备连接
    • 临时协议子模式。与已经是永久协议模式的设备进行连接

    2个连接模式类似于web应用中的服务器和客户端

  • 会话模式。直接在两个已经连接设备中间建立虚拟的数据通道,发送和接收数据

协议头部格式如下,使用了一个魔数(Magic 0x9E79BC40)来标识这个包是一个syncthing中继数据包

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                             Magic                             |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                         Message Type                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                        Message Length                         |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+


struct Header {
        unsigned int Magic;
        int MessageType;
        int MessageLength;
}

其他协议包详见文档

全局发现(Global Discovery)
  • 公告(Announcements)

    设备在启动时,会向全局发现服务器公告自身的id和端口等信息。全局发现服务器是预配置的。

    { addresses: ["tcp://192.0.2.45:22000", "tcp://:22202", "relay://192.0.2.99:22028"]}
    

    公告自己所连接的中继服务器,以及局域网socket

  • 查询(Queries)

    将需要的设备id放入get请求的param中

    查询结果同公共的请求参数

通信流程

假设有3个设备

  1. 首先在启动是,每个设备都有不同的证书,并由此生成不同的证书。用户需要将3个id分别配置到设备中(当然也可以有一个中心服务器动态注册这些信息,但这和去中心化的思想相违背)。
  2. 三个设备都需要向相同的全局发现服务器公告,并且都连接到中继服务器(可以不同),每个设备都获得对等设备的中继地址
  3. 当某个设备需要发送数据时(比如剪切板变化)。遍历3个设备
    1. 对某个设备,获取它的中继地址(或者已经缓存,或不存在,简单取消这一次操作)
    2. 向中继设备发起临时协议子模式连接,创建2设备间的通信通道,发送数据

设备可以在任何时候下线,其他设备通过中继连接失败时可以简单地取消这次操作,也可以实现一个日志机制,当设备再次上线时,一次性同步所以未收到的数据。

日志/消息同步

在每个设备上单独实现日志功能是比较简单的,复杂之处在于消息同步。和IM(即时通讯)系统中,用户设备上线同步历史消息过程类似。

在im中,一般通过消息的id(递增)来确定哪一段时间内消息需要同步。但去中心化的设计导致没有一个中心设备可以提供稳定递增的id号。

可以通过时间段来确定缺失消息,但是需要传输大量的数据。

实现(TODO)

剪切板获取&保存

protobuf

具体的protobuff介绍可以看这个,或者更推荐google官方文档

syntax = "proto3";

option go_package="./;cell";

import "google/protobuf/timestamp.proto";

message Cell{
  google.protobuf.Timestamp Time = 1;
  string Types = 2;
  bytes Data = 3;
}

定义了

  • 时间戳
  • 剪切板类型
  • 数据

因为protobuf自带压缩,在传输图片的时候,效率有很大提升。

clipboard

使用了这个库

封装了watch函数,将图片和文本内容整合到一个channel

func (c *Clipboard) Watching(ctx context.Context) {
	ch1 := clipboard.Watch(ctx, clipboard.FmtText)
	ch2 := clipboard.Watch(ctx, clipboard.FmtImage)

	for {
		select {
		case data := <-ch1:
			c.CellChan <- &cell.Cell{
				Time:  timestamppb.Now(),
				Types: TypeText,
				Data:  data,
			}
		case data := <-ch2:
			c.CellChan <- &cell.Cell{
				Time:  timestamppb.Now(),
				Types: TypeImg,
				Data:  data,
			}
		}
	}
}

写入

func (c *Clipboard) Write(cell *cell.Cell) error {
	switch cell.Types {
	case TypeText:
		clipboard.Write(clipboard.FmtText, cell.Data)
		return nil
	case TypeImg:
		clipboard.Write(clipboard.FmtImage, cell.Data)
		return nil
	default:
		return fmt.Errorf("[ClipboardManager]:error type")
	}
}

设计模式

装饰器

从剪切板中获取的数据可能包含文本和图片,如果加入压缩功能,可以大大提升网络传输效率。

为了数据的安全性,有必要提供数据的端到端加密(E2EE),特别是使用消息队列时,恶意用户完全可以通过接入交换机来获取数据,不加密很有可能导致数据泄露。

但有时我们可能只需要加密而不需要压缩,或者组合其他功能,那如何自由组合这些类别,而同时保持代码的简洁和复用性呢?

装饰是一种结构型设计模式,允许你通过将对象放入包含行为的特殊封装对象中来为原对象绑定新的行为。

image-20220408163356781

组合多件衣服来实现组合效果。

使用装饰器,我们可以随意组合压缩和加密(或更多)功能,来实现组合功能。

代码实现

首先定义一个接口,将加密和解压缩的行为抽象成EncodeDecode。然后定义一个空的BaseConverter作为嵌套基础

type Converter interface {
	Encode([]byte) ([]byte, error)
	Decode([]byte) ([]byte, error)
}

type BaseConverter struct {
}

func (c *BaseConverter) Encode(data []byte) ([]byte, error) {
	return data, nil
}

func (c *BaseConverter) Decode(data []byte) ([]byte, error) {
	return data, nil
}

以加密算法为例

type Encryptor struct {
	Converter Converter
	key       []byte
}

// in -> data -> out
func (e *Encryptor) Encode(in []byte) ([]byte, error) {
	data, err := e.Converter.Encode(in)
	block, err := aes.NewCipher(e.key)
	aesGCM, err := cipher.NewGCM(block)
	nonce := make([]byte, aesGCM.NonceSize())
	if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
		return nil, err
	}
	return aesGCM.Seal(nonce, nonce, data, nil), nil
}

func (e *Encryptor) Decode(in []byte) ([]byte, error) {
	block, err := aes.NewCipher(e.key)
	aesGCM, err := cipher.NewGCM(block)
	nonceSize := aesGCM.NonceSize()
	nonce, cipherData := in[:nonceSize], in[nonceSize:]
	data, err := aesGCM.Open(nil, nonce, cipherData, nil)
	out, err := e.Converter.Decode(data)
	return out, nil
}

func (e *Encryptor) SetKey(key []byte) {
	e.key = key
}

使用,嵌套convert即可

var convert converter.Converter
convert = &converter.BaseConverter{}

func UseCompress() {
	convert = &converter.Compressor{Converter: convert}
}

func UseEncryptor(key []byte) {
	e := &converter.Encryptor{
		Converter: convert,
	}
	e.SetKey(key)
	convert = e
}

clipboard.UseCompress()
clipboard.UseEncryptor(util.MD5([]byte(util.Password)))

责任链

使用剪切板的时候可能会发现有其他应用肆意篡改剪切板数据,写入广告/推广链接。或者我们不希望自己的验证码数据被同步到其他设备。

为此我们需要一个过滤机制,通过配置一些"漏斗"来筛选一些我们不想要的数据

责任链是一种行为设计模式, 允许你将请求沿着处理者链进行发送。收到请求后,每个处理者均可对请求进行处理,或将其传递给链上的下个处理者。

image-20220408155257930

责任链会将特定行为转换为 被称作处理者的独立对象。链上的每个处理者都 有一个成员变量来保存对于下一处理者的引用。除了处理请 求外,处理者还负责沿着链传递请求。请求会在链上移动, 直至所有处理者都有机会对其进行处理。

使用责任链,让剪切板数据在被同步之前流过这些链条。每一个节点都负责一次过滤,每个节点可以截断数据,终止这次同步。实现原理和web框架的拦截器类似。

定义过滤器

Execute执行过滤,SetNext设置下一个节点

type Filter interface {
	Execute(cell *cell.Cell) *cell.Cell
	SetNext(next Filter)
}

验证码过滤器例子

结构体中存有下一个过滤器。在Execute中判断是否匹配成一个验证码,如果是一个验证码则直接返回nil(中断处理)

否则递归调用next.Execute

type VerificationCode struct {
	next   Filter
	regexp *regexp.Regexp
}

func NewVerificationCode() *VerificationCode {
	r, err := regexp.Compile("^[0-9]{4,6}$")
	if err != nil {
		panic(err)
	}
	return &VerificationCode{
		regexp: r,
	}
}

func (c *VerificationCode) Execute(cell *cell.Cell) *cell.Cell {
	if cell.Types == clipboard.TypeText && c.regexp.Match(cell.Data) {
		log.Println("match code")
		return nil
	}
    // 没有下一个节点了,直接返回
	if c.next == nil {
		return cell
	}
	// next
	return c.next.Execute(cell)
}

func (c *VerificationCode) SetNext(next Filter) {
	c.next = next
}

使用

// 定义2个过滤器
taobaoFilter := &filter.TaobaoLink{}
codeFilter := filter.NewVerificationCode()

// 串联
taobaoFilter.SetNext(codeFilter)
var filter_ filter.Filter = taobaoFilter

// 对cell进行过滤
cell = filter_.Execute(cell)
// 为nil则说明被拦截,放弃这次操作
if cell == nil {
    log.Println("Filter cell")
    continue
}
Licensed under CC BY-NC-SA 4.0