Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 110 additions & 91 deletions pkg/network/node/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type nodeEgress struct {
}

type namespaceEgress struct {
vnid uint32
requestedIP string
vnid uint32
requestedIPs []string
}

type egressIPInfo struct {
Expand All @@ -37,8 +37,6 @@ type egressIPInfo struct {

assignedNodeIP string
assignedIPTablesMark string
assignedVNID uint32
blockedVNIDs map[uint32]bool
}

type egressIPWatcher struct {
Expand All @@ -55,6 +53,9 @@ type egressIPWatcher struct {
namespacesByVNID map[uint32]*namespaceEgress
egressIPs map[string]*egressIPInfo

changedEgressIPs []*egressIPInfo
changedNamespaces []*namespaceEgress

localEgressLink netlink.Link
localEgressNet *net.IPNet

Expand Down Expand Up @@ -112,32 +113,57 @@ func (eip *egressIPWatcher) ensureEgressIPInfo(egressIP string) *egressIPInfo {
return eg
}

func (eg *egressIPInfo) addNode(node *nodeEgress) {
func (eip *egressIPWatcher) egressIPChanged(eg *egressIPInfo) {
eip.changedEgressIPs = append(eip.changedEgressIPs, eg)
for _, ns := range eg.namespaces {
eip.changedNamespaces = append(eip.changedNamespaces, ns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might the same namespace would be added to the list multiple times? If so do we care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might the same namespace would be added to the list multiple times? If so do we care?

It gets converted to a map (and thus de-duped) later one. (Why isn't it a map to start with? Um... It probably made sense in some earlier version of the code)

}
}

func (eip *egressIPWatcher) addNode(egressIP string, node *nodeEgress) {
eg := eip.ensureEgressIPInfo(egressIP)
if len(eg.nodes) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple nodes claiming EgressIP %q (nodes %q, %q)", eg.ip, node.nodeIP, eg.nodes[0].nodeIP))
}
eg.nodes = append(eg.nodes, node)

eip.egressIPChanged(eg)
}

func (eg *egressIPInfo) deleteNode(node *nodeEgress) {
func (eip *egressIPWatcher) deleteNode(egressIP string, node *nodeEgress) {
eg := eip.egressIPs[egressIP]
if eg == nil {
return
}

for i := range eg.nodes {
if eg.nodes[i] == node {
eip.egressIPChanged(eg)
eg.nodes = append(eg.nodes[:i], eg.nodes[i+1:]...)
return
}
}
}

func (eg *egressIPInfo) addNamespace(ns *namespaceEgress) {
func (eip *egressIPWatcher) addNamespace(egressIP string, ns *namespaceEgress) {
eg := eip.ensureEgressIPInfo(egressIP)
if len(eg.namespaces) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple namespaces claiming EgressIP %q (NetIDs %d, %d)", eg.ip, ns.vnid, eg.namespaces[0].vnid))
}
eg.namespaces = append(eg.namespaces, ns)

eip.egressIPChanged(eg)
}

func (eg *egressIPInfo) deleteNamespace(ns *namespaceEgress) {
func (eip *egressIPWatcher) deleteNamespace(egressIP string, ns *namespaceEgress) {
eg := eip.egressIPs[egressIP]
if eg == nil {
return
}

for i := range eg.namespaces {
if eg.namespaces[i] == ns {
eip.egressIPChanged(eg)
eg.namespaces = append(eg.namespaces[:i], eg.namespaces[i+1:]...)
return
}
Expand Down Expand Up @@ -183,22 +209,15 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri
oldRequestedIPs := node.requestedIPs
node.requestedIPs = sets.NewString(nodeEgressIPs...)

// Process new EgressIPs
// Process new and removed EgressIPs
for _, ip := range node.requestedIPs.Difference(oldRequestedIPs).UnsortedList() {
eg := eip.ensureEgressIPInfo(ip)
eg.addNode(node)
eip.syncEgressIP(eg)
eip.addNode(ip, node)
}

// Process removed EgressIPs
for _, ip := range oldRequestedIPs.Difference(node.requestedIPs).UnsortedList() {
eg := eip.egressIPs[ip]
if eg == nil {
continue
}
eg.deleteNode(node)
eip.syncEgressIP(eg)
eip.deleteNode(ip, node)
}

eip.syncEgressIPs()
}

func (eip *egressIPWatcher) watchNetNamespaces() {
Expand All @@ -214,10 +233,8 @@ func (eip *egressIPWatcher) handleAddOrUpdateNetNamespace(obj, _ interface{}, ev
if len(netns.EgressIPs) > 1 {
glog.Warningf("Ignoring extra EgressIPs (%v) in NetNamespace %q", netns.EgressIPs[1:], netns.Name)
}
eip.updateNamespaceEgress(netns.NetID, netns.EgressIPs[0])
} else {
eip.deleteNamespaceEgress(netns.NetID)
}
eip.updateNamespaceEgress(netns.NetID, netns.EgressIPs)
}

func (eip *egressIPWatcher) handleDeleteNetNamespace(obj interface{}) {
Expand All @@ -227,126 +244,128 @@ func (eip *egressIPWatcher) handleDeleteNetNamespace(obj interface{}) {
eip.deleteNamespaceEgress(netns.NetID)
}

func (eip *egressIPWatcher) updateNamespaceEgress(vnid uint32, egressIP string) {
func (eip *egressIPWatcher) updateNamespaceEgress(vnid uint32, egressIPs []string) {
eip.Lock()
defer eip.Unlock()

ns := eip.namespacesByVNID[vnid]
if ns == nil {
if egressIP == "" {
if len(egressIPs) == 0 {
return
}
ns = &namespaceEgress{vnid: vnid}
eip.namespacesByVNID[vnid] = ns
} else if egressIP == "" {
} else if len(egressIPs) == 0 {
delete(eip.namespacesByVNID, vnid)
}

if ns.requestedIP == egressIP {
return
}
oldRequestedIPs := sets.NewString(ns.requestedIPs...)
newRequestedIPs := sets.NewString(egressIPs...)
ns.requestedIPs = egressIPs

if ns.requestedIP != "" {
eg := eip.egressIPs[ns.requestedIP]
if eg != nil {
eg.deleteNamespace(ns)
eip.syncEgressIP(eg)
}
// Process new and removed EgressIPs
for _, ip := range newRequestedIPs.Difference(oldRequestedIPs).UnsortedList() {
eip.addNamespace(ip, ns)
}

ns.requestedIP = egressIP
if egressIP == "" {
return
for _, ip := range oldRequestedIPs.Difference(newRequestedIPs).UnsortedList() {
eip.deleteNamespace(ip, ns)
}

eg := eip.ensureEgressIPInfo(egressIP)
eg.addNamespace(ns)
eip.syncEgressIP(eg)
// Make sure we update OVS even if nothing was added or removed; the order might
// have changed
eip.changedNamespaces = append(eip.changedNamespaces, ns)

eip.syncEgressIPs()
}

func (eip *egressIPWatcher) deleteNamespaceEgress(vnid uint32) {
eip.updateNamespaceEgress(vnid, "")
eip.updateNamespaceEgress(vnid, nil)
}

func (eip *egressIPWatcher) syncEgressIP(eg *egressIPInfo) {
assignedNodeIPChanged := eip.syncEgressIPTablesState(eg)
eip.syncEgressOVSState(eg, assignedNodeIPChanged)
func (eip *egressIPWatcher) syncEgressIPs() {
changedEgressIPs := make(map[*egressIPInfo]bool)
for _, eg := range eip.changedEgressIPs {
changedEgressIPs[eg] = true
}
eip.changedEgressIPs = eip.changedEgressIPs[:0]

changedNamespaces := make(map[*namespaceEgress]bool)
for _, ns := range eip.changedNamespaces {
changedNamespaces[ns] = true
}
eip.changedNamespaces = eip.changedNamespaces[:0]

for eg := range changedEgressIPs {
eip.syncEgressNodeState(eg)
}

for ns := range changedNamespaces {
err := eip.syncEgressNamespaceState(ns)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules for VNID %d: %v", ns.vnid, err))
}
}
}

func (eip *egressIPWatcher) syncEgressIPTablesState(eg *egressIPInfo) bool {
func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
// The egressIPInfo should have an assigned node IP if and only if the
// egress IP is active (ie, it is assigned to exactly 1 node and exactly
// 1 namespace).
egressIPActive := (len(eg.nodes) == 1 && len(eg.namespaces) == 1)
assignedNodeIPChanged := false
if egressIPActive && eg.assignedNodeIP != eg.nodes[0].nodeIP {
glog.V(4).Infof("Assigning egress IP %s to node %s", eg.ip, eg.nodes[0].nodeIP)
eg.assignedNodeIP = eg.nodes[0].nodeIP
eg.assignedIPTablesMark = getMarkForVNID(eg.namespaces[0].vnid, eip.masqueradeBit)
assignedNodeIPChanged = true
if eg.assignedNodeIP == eip.localIP {
if err := eip.assignEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil {
utilruntime.HandleError(fmt.Errorf("Error assigning Egress IP %q: %v", eg.ip, err))
eg.assignedNodeIP = ""
}
}
} else if !egressIPActive && eg.assignedNodeIP != "" {
glog.V(4).Infof("Removing egress IP %s from node %s", eg.ip, eg.assignedNodeIP)
if eg.assignedNodeIP == eip.localIP {
if err := eip.releaseEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil {
utilruntime.HandleError(fmt.Errorf("Error releasing Egress IP %q: %v", eg.ip, err))
}
}
eg.assignedNodeIP = ""
eg.assignedIPTablesMark = ""
assignedNodeIPChanged = true
} else if !egressIPActive {
glog.V(4).Infof("Egress IP %s is not assignable (%d namespaces, %d nodes)", eg.ip, len(eg.namespaces), len(eg.nodes))
}
return assignedNodeIPChanged
}

func (eip *egressIPWatcher) syncEgressOVSState(eg *egressIPInfo, assignedNodeIPChanged bool) {
var blockedVNIDs map[uint32]bool

// If multiple namespaces are assigned to the same EgressIP, we need to block
// outgoing traffic from all of them.
if len(eg.namespaces) > 1 {
eg.assignedVNID = 0
blockedVNIDs = make(map[uint32]bool)
for _, ns := range eg.namespaces {
blockedVNIDs[ns.vnid] = true
if !eg.blockedVNIDs[ns.vnid] {
err := eip.oc.SetNamespaceEgressDropped(ns.vnid)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
}
}
}
func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error {
if len(ns.requestedIPs) == 0 {
return eip.oc.SetNamespaceEgressNormal(ns.vnid)
}

// If we have, or had, a single egress namespace, then update the OVS flows if
// something has changed
var err error
if len(eg.namespaces) == 1 && (eg.assignedVNID != eg.namespaces[0].vnid || assignedNodeIPChanged) {
eg.assignedVNID = eg.namespaces[0].vnid
delete(eg.blockedVNIDs, eg.assignedVNID)
err = eip.oc.SetNamespaceEgressViaEgressIP(eg.assignedVNID, eg.assignedNodeIP, getMarkForVNID(eg.assignedVNID, eip.masqueradeBit))
} else if len(eg.namespaces) == 0 && eg.assignedVNID != 0 {
err = eip.oc.SetNamespaceEgressNormal(eg.assignedVNID)
eg.assignedVNID = 0
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
}

// If we previously had blocked VNIDs, we need to unblock any that have been removed
// from the duplicates list
for vnid := range eg.blockedVNIDs {
if !blockedVNIDs[vnid] {
err := eip.oc.SetNamespaceEgressNormal(vnid)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
var active *egressIPInfo
for i, ip := range ns.requestedIPs {
eg := eip.egressIPs[ip]
if eg == nil {
continue
}
if len(eg.namespaces) > 1 {
active = nil
glog.V(4).Infof("VNID %d gets no egress due to multiply-assigned egress IP %s", ns.vnid, eg.ip)
break
}
if active == nil && i == 0 {
if eg.assignedNodeIP == "" {
glog.V(4).Infof("VNID %d cannot use unassigned egress IP %s", ns.vnid, eg.ip)
} else {
active = eg
}
}
}
eg.blockedVNIDs = blockedVNIDs

if active != nil {
return eip.oc.SetNamespaceEgressViaEgressIP(ns.vnid, active.assignedNodeIP, active.assignedIPTablesMark)
} else {
return eip.oc.SetNamespaceEgressDropped(ns.vnid)
}
}

func (eip *egressIPWatcher) assignEgressIP(egressIP, mark string) error {
Expand Down
Loading