Implement IPv6 multicast again (fixes #346)

This commit is contained in:
Jakob Borg 2014-08-17 15:01:48 +02:00
parent a1fd07b27c
commit d657bc4e3d
9 changed files with 151 additions and 52 deletions

File diff suppressed because one or more lines are too long

46
beacon/beacon.go Normal file
View File

@ -0,0 +1,46 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package beacon
import "net"
type recv struct {
data []byte
src net.Addr
}
type dst struct {
intf string
conn *net.UDPConn
}
type Interface interface {
Send(data []byte)
Recv() ([]byte, net.Addr)
}
func genericReader(conn *net.UDPConn, outbox chan<- recv) {
bs := make([]byte, 65536)
for {
n, addr, err := conn.ReadFrom(bs)
if err != nil {
l.Warnln("multicast read:", err)
return
}
if debug {
l.Debugf("recv %d bytes from %s", n, addr)
}
c := make([]byte, n)
copy(c, bs)
select {
case outbox <- recv{c, addr}:
default:
if debug {
l.Debugln("dropping message")
}
}
}
}

View File

@ -6,16 +6,6 @@ package beacon
import "net"
type recv struct {
data []byte
src net.Addr
}
type dst struct {
intf string
conn *net.UDPConn
}
type Broadcast struct {
conn *net.UDPConn
port int
@ -36,7 +26,7 @@ func NewBroadcast(port int) (*Broadcast, error) {
outbox: make(chan recv, 16),
}
go b.reader()
go genericReader(b.conn, b.outbox)
go b.writer()
return b, nil
@ -51,30 +41,6 @@ func (b *Broadcast) Recv() ([]byte, net.Addr) {
return recv.data, recv.src
}
func (b *Broadcast) reader() {
bs := make([]byte, 65536)
for {
n, addr, err := b.conn.ReadFrom(bs)
if err != nil {
l.Warnln("Broadcast read:", err)
return
}
if debug {
l.Debugf("recv %d bytes from %s", n, addr)
}
c := make([]byte, n)
copy(c, bs)
select {
case b.outbox <- recv{c, addr}:
default:
if debug {
l.Debugln("dropping message")
}
}
}
}
func (b *Broadcast) writer() {
for bs := range b.inbox {

70
beacon/multicast.go Normal file
View File

@ -0,0 +1,70 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// All rights reserved. Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package beacon
import "net"
type Multicast struct {
conn *net.UDPConn
addr *net.UDPAddr
conns []dst
inbox chan []byte
outbox chan recv
}
func NewMulticast(addr string) (*Multicast, error) {
gaddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.ListenMulticastUDP("udp", nil, gaddr)
if err != nil {
return nil, err
}
b := &Multicast{
conn: conn,
addr: gaddr,
inbox: make(chan []byte),
outbox: make(chan recv, 16),
}
go genericReader(b.conn, b.outbox)
go b.writer()
return b, nil
}
func (b *Multicast) Send(data []byte) {
b.inbox <- data
}
func (b *Multicast) Recv() ([]byte, net.Addr) {
recv := <-b.outbox
return recv.data, recv.src
}
func (b *Multicast) writer() {
for bs := range b.inbox {
intfs, err := net.Interfaces()
if err != nil {
l.Warnln("multicast interfaces:", err)
continue
}
for _, intf := range intfs {
if intf.Flags&net.FlagUp != 0 && intf.Flags&net.FlagMulticast != 0 {
addr := *b.addr
addr.Zone = intf.Name
_, err = b.conn.WriteTo(bs, &addr)
if err != nil {
if debug {
l.Debugln(err, "on write to", addr)
}
} else if debug {
l.Debugf("sent %d bytes to %s", len(bs), addr.String())
}
}
}
}
}

View File

@ -985,7 +985,7 @@ func setTCPOptions(conn *net.TCPConn) {
}
func discovery(extPort int) *discover.Discoverer {
disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.LocalAnnPort)
disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.LocalAnnPort, cfg.Options.LocalAnnMCAddr)
if err != nil {
l.Warnf("No discovery possible (%v)", err)
return nil

View File

@ -113,6 +113,7 @@ type OptionsConfiguration struct {
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"`
LocalAnnPort int `xml:"localAnnouncePort" default:"21025"`
LocalAnnMCAddr string `xml:"localAnnounceMCAddr" default:"[ff32::5222]:21026"`
ParallelRequests int `xml:"parallelRequests" default:"16"`
MaxSendKbps int `xml:"maxSendKbps"`
RescanIntervalS int `xml:"rescanIntervalS" default:"60"`

View File

@ -30,6 +30,7 @@ func TestDefaultValues(t *testing.T) {
GlobalAnnEnabled: true,
LocalAnnEnabled: true,
LocalAnnPort: 21025,
LocalAnnMCAddr: "[ff32::5222]:21026",
ParallelRequests: 16,
MaxSendKbps: 0,
RescanIntervalS: 60,
@ -200,6 +201,7 @@ func TestOverriddenValues(t *testing.T) {
<globalAnnounceEnabled>false</globalAnnounceEnabled>
<localAnnounceEnabled>false</localAnnounceEnabled>
<localAnnouncePort>42123</localAnnouncePort>
<localAnnounceMCAddr>quux:3232</localAnnounceMCAddr>
<parallelRequests>32</parallelRequests>
<maxSendKbps>1234</maxSendKbps>
<rescanIntervalS>600</rescanIntervalS>
@ -218,6 +220,7 @@ func TestOverriddenValues(t *testing.T) {
GlobalAnnEnabled: false,
LocalAnnEnabled: false,
LocalAnnPort: 42123,
LocalAnnMCAddr: "quux:3232",
ParallelRequests: 32,
MaxSendKbps: 1234,
RescanIntervalS: 600,

View File

@ -26,7 +26,8 @@ type Discoverer struct {
globalBcastIntv time.Duration
errorRetryIntv time.Duration
cacheLifetime time.Duration
broadcastBeacon *beacon.Broadcast
broadcastBeacon beacon.Interface
multicastBeacon beacon.Interface
registry map[protocol.NodeID][]cacheEntry
registryLock sync.RWMutex
extServer string
@ -54,11 +55,7 @@ var (
// When we hit this many errors in succession, we stop.
const maxErrors = 30
func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Discoverer, error) {
b, err := beacon.NewBroadcast(localPort)
if err != nil {
return nil, err
}
func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int, localMCAddr string) (*Discoverer, error) {
disc := &Discoverer{
myID: id,
listenAddrs: addresses,
@ -66,11 +63,26 @@ func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Disc
globalBcastIntv: 1800 * time.Second,
errorRetryIntv: 60 * time.Second,
cacheLifetime: 5 * time.Minute,
broadcastBeacon: b,
registry: make(map[protocol.NodeID][]cacheEntry),
}
go disc.recvAnnouncements()
if localPort > 0 {
bb, err := beacon.NewBroadcast(localPort)
if err != nil {
return nil, err
}
disc.broadcastBeacon = bb
go disc.recvAnnouncements(bb)
}
if len(localMCAddr) > 0 {
mb, err := beacon.NewMulticast(localMCAddr)
if err != nil {
return nil, err
}
disc.multicastBeacon = mb
go disc.recvAnnouncements(mb)
}
return disc, nil
}
@ -187,7 +199,12 @@ func (d *Discoverer) sendLocalAnnouncements() {
msg := pkt.MarshalXDR()
for {
d.broadcastBeacon.Send(msg)
if d.multicastBeacon != nil {
d.multicastBeacon.Send(msg)
}
if d.broadcastBeacon != nil {
d.broadcastBeacon.Send(msg)
}
select {
case <-d.localBcastTick:
@ -284,9 +301,9 @@ loop:
}
}
func (d *Discoverer) recvAnnouncements() {
func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
for {
buf, addr := d.broadcastBeacon.Recv()
buf, addr := b.Recv()
if debug {
l.Debugf("discover: read announcement from %s:\n%s", addr, hex.Dump(buf))

View File

@ -614,10 +614,6 @@
</label>
</div>
</div>
<div class="form-group">
<label translate for="LocalAnnPort">Local Discovery Port</label>
<input ng-disabled="!tmpOptions.LocalAnnEnabled" id="LocalAnnPort" class="form-control" type="number" ng-model="tmpOptions.LocalAnnPort">
</div>
<div class="form-group">
<div class="checkbox">
<label>