Skip to content

Commit

Permalink
wait until seccomp filters are created before starting to filter packets
Browse files Browse the repository at this point in the history
  • Loading branch information
capnspacehook committed Jul 7, 2022
1 parent cf277e0 commit 223c521
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 67 deletions.
142 changes: 113 additions & 29 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (

type FilterManager struct {
ready chan struct{}
abort chan struct{}

started bool

logger *zap.Logger

Expand All @@ -48,9 +51,15 @@ type FilterManager struct {
}

type filter struct {
dnsReqNFReady chan struct{}
genericNFReady chan struct{}
wg sync.WaitGroup
dnsReqReady chan struct{}
dnsReqAbort chan struct{}
genericReady chan struct{}
genericAbort chan struct{}
cachingReady chan struct{}
cachingAbort chan struct{}

started bool
wg sync.WaitGroup

opts *FilterOptions

Expand Down Expand Up @@ -106,9 +115,12 @@ type resolver interface {

type enforcerCreator func(ctx context.Context, logger *zap.Logger, queueNum uint16, ipv6 bool, hook nfqueue.HookFunc) (enforcer, error)

func StartFilters(ctx context.Context, logger *zap.Logger, config *Config) (*FilterManager, error) {
// CreateFilters creates packet filters. The returned FilterManager can
// be used to start or stop packet filtering.
func CreateFilters(ctx context.Context, logger *zap.Logger, config *Config) (*FilterManager, error) {
f := FilterManager{
ready: make(chan struct{}),
abort: make(chan struct{}),
logger: logger,
queueNum4: config.InboundDNSQueue.IPv4,
queueNum6: config.InboundDNSQueue.IPv6,
Expand All @@ -118,14 +130,14 @@ func StartFilters(ctx context.Context, logger *zap.Logger, config *Config) (*Fil
// if mock enforcers and resolver is not set, use real ones
newEnforcer := config.enforcerCreator
if newEnforcer == nil {
newEnforcer = startNfQueue
newEnforcer = openNfQueue
}
res := config.resolver
if res == nil {
res = &net.Resolver{}
}

nf4, nf6, err := startNfQueues(ctx, logger, config.InboundDNSQueue, newEnforcer, func(ipv6 bool) nfqueue.HookFunc {
nf4, nf6, err := openNfQueues(ctx, logger, config.InboundDNSQueue, newEnforcer, func(ipv6 bool) nfqueue.HookFunc {
return newDNSResponseCallback(&f, ipv6)
})
if err != nil {
Expand All @@ -136,7 +148,7 @@ func StartFilters(ctx context.Context, logger *zap.Logger, config *Config) (*Fil

for i := range config.Filters {
isSelfFilter := config.SelfDNSQueue == config.Filters[i].DNSQueue
filter, err := startFilter(ctx, logger, &config.Filters[i], isSelfFilter, newEnforcer, res)
filter, err := createFilter(ctx, logger, &config.Filters[i], isSelfFilter, newEnforcer, res)
if err != nil {
// TODO: stop other filters here
return nil, err
Expand All @@ -145,16 +157,32 @@ func StartFilters(ctx context.Context, logger *zap.Logger, config *Config) (*Fil
f.filters[i] = filter
}

return &f, nil
}

// Start starts packet filtering.
func (f *FilterManager) Start() {
// Let the DNS response callback know everything is setup. The
// callback will be executing on another goroutine started by
// nfqueue.RegisterWithErrorFunc, but only after a packet is
// received on its nfqueue.
close(f.ready)

return &f, nil
for i := range f.filters {
f.filters[i].start()
}

f.started = true
}

// Stop stops packet filtering and cleans up owned resources.
func (f *FilterManager) Stop() {
// if the filters have not been started yet, tell running goroutines
// to abort and finish
if !f.started {
close(f.abort)
}

if f.dnsRespNF4 != nil {
f.dnsRespNF4.Close()
}
Expand All @@ -167,36 +195,38 @@ func (f *FilterManager) Stop() {
}
}

func startFilter(ctx context.Context, logger *zap.Logger, opts *FilterOptions, isSelfFilter bool, newEnforcer enforcerCreator, res resolver) (*filter, error) {
func createFilter(ctx context.Context, logger *zap.Logger, opts *FilterOptions, isSelfFilter bool, newEnforcer enforcerCreator, res resolver) (*filter, error) {
filterLogger := logger
if opts.Name != "" {
filterLogger = filterLogger.With(zap.String("filter.name", opts.Name))
}

f := filter{
dnsReqNFReady: make(chan struct{}),
genericNFReady: make(chan struct{}),
opts: opts,
logger: filterLogger,
res: res,
connections: NewTimedCache[connectionID](logger, true),
isSelfFilter: isSelfFilter,
dnsReqReady: make(chan struct{}),
dnsReqAbort: make(chan struct{}),
genericReady: make(chan struct{}),
genericAbort: make(chan struct{}),
cachingReady: make(chan struct{}),
cachingAbort: make(chan struct{}),
opts: opts,
logger: filterLogger,
res: res,
connections: NewTimedCache[connectionID](logger, true),
isSelfFilter: isSelfFilter,
}

if opts.TrafficQueue.eitherSet() {
f.allowedIPs = NewTimedCache[netip.Addr](f.logger, false)
f.additionalHostnames = NewTimedCache[string](filterLogger, false)

nf4, nf6, err := startNfQueues(ctx, filterLogger, opts.TrafficQueue, newEnforcer, func(ipv6 bool) nfqueue.HookFunc {
nf4, nf6, err := openNfQueues(ctx, filterLogger, opts.TrafficQueue, newEnforcer, func(ipv6 bool) nfqueue.HookFunc {
return newGenericCallback(&f, ipv6)
})
if err != nil {
return nil, fmt.Errorf("error starting traffic nfqueues: %v", err)
}
f.genericNF4 = nf4
f.genericNF6 = nf6
// let the generic packet callback know everything is setup
close(f.genericNFReady)

if len(f.opts.CachedHostnames) > 0 {
f.wg.Add(1)
Expand All @@ -209,22 +239,21 @@ func startFilter(ctx context.Context, logger *zap.Logger, opts *FilterOptions, i
}

if opts.DNSQueue.eitherSet() {
nf4, nf6, err := startNfQueues(ctx, filterLogger, opts.DNSQueue, newEnforcer, func(ipv6 bool) nfqueue.HookFunc {
nf4, nf6, err := openNfQueues(ctx, filterLogger, opts.DNSQueue, newEnforcer, func(ipv6 bool) nfqueue.HookFunc {
return newDNSRequestCallback(&f, ipv6)
})
if err != nil {
return nil, fmt.Errorf("error starting DNS nfqueues: %v", err)
}
f.dnsReqNF4 = nf4
f.dnsReqNF6 = nf6
// let the DNS request callback know everything is setup
close(f.dnsReqNFReady)

}

return &f, nil
}

func startNfQueues(ctx context.Context, logger *zap.Logger, queues queue, newEnforcer enforcerCreator, hookGen func(ipv6 bool) nfqueue.HookFunc) (nf4 enforcer, nf6 enforcer, err error) {
func openNfQueues(ctx context.Context, logger *zap.Logger, queues queue, newEnforcer enforcerCreator, hookGen func(ipv6 bool) nfqueue.HookFunc) (nf4 enforcer, nf6 enforcer, err error) {
if queues.IPv4 != 0 {
nf4, err = newEnforcer(ctx, logger, queues.IPv4, false, hookGen(false))
if err != nil {
Expand All @@ -241,7 +270,7 @@ func startNfQueues(ctx context.Context, logger *zap.Logger, queues queue, newEnf
return nf4, nf6, nil
}

func startNfQueue(ctx context.Context, logger *zap.Logger, queueNum uint16, ipv6 bool, hook nfqueue.HookFunc) (enforcer, error) {
func openNfQueue(ctx context.Context, logger *zap.Logger, queueNum uint16, ipv6 bool, hook nfqueue.HookFunc) (enforcer, error) {
afFamily := unix.AF_INET
if ipv6 {
afFamily = unix.AF_INET6
Expand Down Expand Up @@ -291,7 +320,30 @@ func startNfQueue(ctx context.Context, logger *zap.Logger, queueNum uint16, ipv6
return nf, nil
}

func (f *filter) start() {
if f.opts.DNSQueue.eitherSet() {
close(f.dnsReqReady)
}
if f.opts.TrafficQueue.eitherSet() {
close(f.genericReady)
}
if len(f.opts.CachedHostnames) > 0 {
close(f.cachingReady)
}

f.started = true
}

func (f *filter) cacheHostnames(ctx context.Context, logger *zap.Logger) {
// wait until the filter manager is setup to prevent race conditions
select {
case <-f.cachingReady:
case <-f.cachingAbort:
// the filter manager has been stopped before it was started,
// return so the parent filter can finish cleaning up
return
}

logger.Debug("starting cache loop")

var (
Expand Down Expand Up @@ -344,6 +396,20 @@ func (f *filter) cacheHostnames(ctx context.Context, logger *zap.Logger) {
}

func (f *filter) close() {
// if the filter has not been started yet, tell running goroutines
// to abort and finish
if !f.started {
if f.opts.DNSQueue.eitherSet() {
close(f.dnsReqAbort)
}
if f.opts.TrafficQueue.eitherSet() {
close(f.genericAbort)
}
if len(f.opts.CachedHostnames) > 0 {
close(f.cachingAbort)
}
}

f.wg.Wait()

if f.dnsReqNF4 != nil {
Expand Down Expand Up @@ -381,8 +447,14 @@ func newDNSRequestCallback(f *filter, ipv6 bool) nfqueue.HookFunc {
logger.Info("started nfqueue")

return func(attr nfqueue.Attribute) int {
// wait until the filter is setup to prevent race conditions
<-f.dnsReqNFReady
// wait until the filter manager is setup to prevent race conditions
select {
case <-f.dnsReqReady:
// the filter manager has been stopped before it was started,
// return so the parent filter can finish cleaning up
case <-f.dnsReqAbort:
return 0
}

var dnsReqNF enforcer
if !ipv6 {
Expand Down Expand Up @@ -577,7 +649,13 @@ func newDNSResponseCallback(f *FilterManager, ipv6 bool) nfqueue.HookFunc {

return func(attr nfqueue.Attribute) int {
// wait until the filter manager is setup to prevent race conditions
<-f.ready
select {
case <-f.ready:
case <-f.abort:
// the filter manager has been stopped before it was started,
// return so the parent filter can finish cleaning up
return 0
}

var dnsRespNF enforcer
if !ipv6 {
Expand Down Expand Up @@ -732,8 +810,14 @@ func newGenericCallback(f *filter, ipv6 bool) nfqueue.HookFunc {
logger.Info("started nfqueue")

return func(attr nfqueue.Attribute) int {
// wait until the filter is setup to prevent race conditions
<-f.genericNFReady
// wait until the filter manager is setup to prevent race conditions
select {
case <-f.genericReady:
case <-f.genericAbort:
// the filter manager has been stopped before it was started,
// return so the parent filter can finish cleaning up
return 0
}

var genericNF enforcer
if !ipv6 {
Expand Down
Loading

0 comments on commit 223c521

Please sign in to comment.