From 144a7476ba2bb0d466008e5d0ea43e625bb78720 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Sun, 28 Mar 2021 19:36:05 -0700 Subject: [PATCH 1/9] Refactoring engine to use internal Vertex/Edge structures (rather then protobuf ones). This will allow graph database interfaces and pipeline engines to pass additional meta data with graph elements (such as data load state) --- cmd/kvload/main.go | 8 +- cmd/mongoload/main.go | 5 +- conformance/tests/ot_job.py | 2 +- elastic/convert.go | 24 ++---- elastic/graph.go | 56 +++++++------- engine/core/processors.go | 99 ++++++++++++++----------- existing-sql/graph.go | 44 +++++------ gdbi/interface.go | 29 +++++--- gdbi/traveler.go | 49 ++++++++++++ grids/graph.go | 121 +++++++++++++++--------------- grids/index.go | 9 ++- grids/schema.go | 4 +- gripper/graph.go | 143 ++++++++++++++++++++---------------- kvgraph/graph.go | 139 +++++++++++++++++++++++------------ kvgraph/schema.go | 4 +- mongo/convert.go | 39 +++++----- mongo/graph.go | 27 ++++--- psql/graph.go | 48 ++++++------ server/api.go | 17 +++-- util/insert.go | 20 ++--- util/protoutil/protobuf.go | 18 +++++ 21 files changed, 526 insertions(+), 379 deletions(-) diff --git a/cmd/kvload/main.go b/cmd/kvload/main.go index 2050aa89..89631bc6 100644 --- a/cmd/kvload/main.go +++ b/cmd/kvload/main.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/bmeg/grip/gripql" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/kvgraph" "github.com/bmeg/grip/kvi" "github.com/bmeg/grip/log" @@ -95,7 +95,7 @@ var Cmd = &cobra.Command{ edgeFileArray = append(edgeFileArray, edgeFile) } - graphChan := make(chan *gripql.GraphElement, 10) + graphChan := make(chan *gdbi.GraphElement, 10) wg := &sync.WaitGroup{} go func() { wg.Add(1) @@ -115,7 +115,7 @@ var Cmd = &cobra.Command{ continue } for v := range vertChan { - graphChan <- &gripql.GraphElement{Graph: graph, Vertex: v} + graphChan <- &gdbi.GraphElement{Graph: graph, Vertex: gdbi.NewElementFromVertex(v)} count++ vertexCounter.Incr(1) if count%10000 == 0 { @@ -135,7 +135,7 @@ var Cmd = &cobra.Command{ continue } for e := range edgeChan { - graphChan <- &gripql.GraphElement{Graph: graph, Edge: e} + graphChan <- &gdbi.GraphElement{Graph: graph, Edge: gdbi.NewElementFromEdge(e)} count++ edgeCounter.Incr(1) if count%10000 == 0 { diff --git a/cmd/mongoload/main.go b/cmd/mongoload/main.go index 77f6d6c9..6c2ce706 100644 --- a/cmd/mongoload/main.go +++ b/cmd/mongoload/main.go @@ -8,6 +8,7 @@ import ( //"io" //"strings" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" "github.com/bmeg/grip/mongo" @@ -40,7 +41,7 @@ func vertexSerialize(vertChan chan *gripql.Vertex, workers int) chan []byte { wg.Add(1) go func() { for v := range vertChan { - doc := mongo.PackVertex(v) + doc := mongo.PackVertex(gdbi.NewElementFromVertex(v)) rawBytes, err := bson.Marshal(doc) if err == nil { dataChan <- rawBytes @@ -63,7 +64,7 @@ func edgeSerialize(edgeChan chan *gripql.Edge, workers int) chan []byte { wg.Add(1) go func() { for e := range edgeChan { - doc := mongo.PackEdge(e) + doc := mongo.PackEdge(gdbi.NewElementFromEdge(e)) rawBytes, err := bson.Marshal(doc) if err == nil { dataChan <- rawBytes diff --git a/conformance/tests/ot_job.py b/conformance/tests/ot_job.py index 65332bd0..27500e1e 100644 --- a/conformance/tests/ot_job.py +++ b/conformance/tests/ot_job.py @@ -64,7 +64,7 @@ def test_job(man): for a, b in zip(fullResults, resumedResults): if a != b: errors.append("%s != %s" % (a, b)) - + return errors G.deleteJob(job["id"]) count = 0 for j in G.listJobs(): diff --git a/elastic/convert.go b/elastic/convert.go index 0d78e1d8..d44afa61 100644 --- a/elastic/convert.go +++ b/elastic/convert.go @@ -1,36 +1,28 @@ package elastic import ( + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "google.golang.org/protobuf/types/known/structpb" ) -// PackVertex take a AQL vertex and convert it to a mongo doc -func PackVertex(v *gripql.Vertex) map[string]interface{} { - p := map[string]interface{}{} - if v.Data != nil { - p = v.Data.AsMap() - } - //fmt.Printf("proto:%s\nmap:%s\n", v.Data, p) +// PackVertex take a gdbi vertex and convert it to a mongo doc +func PackVertex(v *gdbi.Vertex) map[string]interface{} { return map[string]interface{}{ - "gid": v.Gid, + "gid": v.ID, "label": v.Label, - "data": p, + "data": v.Data, } } // PackEdge takes a AQL edge and converts it to a mongo doc -func PackEdge(e *gripql.Edge) map[string]interface{} { - p := map[string]interface{}{} - if e.Data != nil { - p = e.Data.AsMap() - } +func PackEdge(e *gdbi.Edge) map[string]interface{} { return map[string]interface{}{ - "gid": e.Gid, + "gid": e.ID, "from": e.From, "to": e.To, "label": e.Label, - "data": p, + "data": e.Data, } } diff --git a/elastic/graph.go b/elastic/graph.go index 4a5d5fb1..06c95495 100644 --- a/elastic/graph.go +++ b/elastic/graph.go @@ -47,7 +47,7 @@ func (es *Graph) GetTimestamp() string { // AddEdge adds an edge to the graph, if the id is not "" and in already exists // in the graph, it is replaced -func (es *Graph) AddEdge(edges []*gripql.Edge) error { +func (es *Graph) AddEdge(edges []*gdbi.Edge) error { ctx := context.Background() bulkRequest := es.client.Bulk() @@ -55,7 +55,7 @@ func (es *Graph) AddEdge(edges []*gripql.Edge) error { bulkRequest = bulkRequest.Refresh("true") } for _, e := range edges { - if e.Gid == "" { + if e.ID == "" { return fmt.Errorf("Edge Gid cannot be an empty string") } pe := PackEdge(e) @@ -67,7 +67,7 @@ func (es *Graph) AddEdge(edges []*gripql.Edge) error { req := elastic.NewBulkUpdateRequest(). Index(es.edgeIndex). Type("edge"). - Id(e.Gid). + Id(e.ID). Script(script). Upsert(pe) bulkRequest = bulkRequest.Add(req) @@ -82,7 +82,7 @@ func (es *Graph) AddEdge(edges []*gripql.Edge) error { // AddVertex adds an edge to the graph, if the id is not "" and in already exists // in the graph, it is replaced -func (es *Graph) AddVertex(vertices []*gripql.Vertex) error { +func (es *Graph) AddVertex(vertices []*gdbi.Vertex) error { ctx := context.Background() bulkRequest := es.client.Bulk() @@ -90,7 +90,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error { bulkRequest = bulkRequest.Refresh("true") } for _, v := range vertices { - if v.Gid == "" { + if v.ID == "" { return fmt.Errorf("Vertex Gid cannot be an empty string") } pv := PackVertex(v) @@ -100,7 +100,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error { req := elastic.NewBulkUpdateRequest(). Index(es.vertexIndex). Type("vertex"). - Id(v.Gid). + Id(v.ID). Script(script). Upsert(pv) bulkRequest = bulkRequest.Add(req) @@ -113,7 +113,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error { return nil } -func (es *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (es *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error { return util.StreamBatch(stream, 50, es.graph, es.AddVertex, es.AddEdge) } @@ -171,7 +171,7 @@ func (es *Graph) DelVertex(vid string) error { } // GetEdge gets a specific edge -func (es *Graph) GetEdge(id string, load bool) *gripql.Edge { +func (es *Graph) GetEdge(id string, load bool) *gdbi.Edge { ctx := context.Background() g := es.client.Get().Index(es.edgeIndex).Id(id) @@ -192,11 +192,11 @@ func (es *Graph) GetEdge(id string, load bool) *gripql.Edge { return nil } - return edge + return gdbi.NewElementFromEdge(edge) } // GetVertex gets vertex `id` -func (es *Graph) GetVertex(id string, load bool) *gripql.Vertex { +func (es *Graph) GetVertex(id string, load bool) *gdbi.Vertex { ctx := context.Background() g := es.client.Get().Index(es.vertexIndex).Id(id) @@ -217,12 +217,12 @@ func (es *Graph) GetVertex(id string, load bool) *gripql.Vertex { return nil } - return vertex + return gdbi.NewElementFromVertex(vertex) } // GetEdgeList produces a channel of all edges in the graph -func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge { - o := make(chan *gripql.Edge, 100) +func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) // 1st goroutine sends individual hits to channel. hits := make(chan json.RawMessage, es.pageSize) @@ -257,7 +257,7 @@ func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge if err != nil { return err } - o <- edge + o <- gdbi.NewElementFromEdge(edge) } return nil }) @@ -274,8 +274,8 @@ func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge } // GetVertexList produces a channel of all vertices in the graph -func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex { - o := make(chan *gripql.Vertex, es.pageSize) +func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, es.pageSize) // 1st goroutine sends individual hits to channel. hits := make(chan json.RawMessage, es.pageSize) @@ -310,7 +310,7 @@ func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Ve if err != nil { return fmt.Errorf("Failed to unmarshal vertex: %v", err) } - o <- vertex + o <- gdbi.NewElementFromVertex(vertex) } return nil }) @@ -373,7 +373,7 @@ func (es *Graph) GetVertexChannel(ctx context.Context, req chan gdbi.ElementLook } r := batchMap[vertex.Gid] for _, ri := range r { - ri.Vertex = vertex + ri.Vertex = gdbi.NewElementFromVertex(vertex) o <- ri } } @@ -448,7 +448,7 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup, } r := batchMap[edge.From] for _, ri := range r { - ri.Vertex = &gripql.Vertex{Gid: edge.To} + ri.Vertex = &gdbi.Vertex{ID: edge.To} b = append(b, ri) } } @@ -465,8 +465,8 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup, idBatch := make([]string, len(batch)) batchMap := make(map[string][]gdbi.ElementLookup, len(batch)) for i := range batch { - idBatch[i] = batch[i].Vertex.Gid - batchMap[batch[i].Vertex.Gid] = append(batchMap[batch[i].Vertex.Gid], batch[i]) + idBatch[i] = batch[i].Vertex.ID + batchMap[batch[i].Vertex.ID] = append(batchMap[batch[i].Vertex.ID], batch[i]) } q := es.client.Search().Index(es.vertexIndex) @@ -485,7 +485,7 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup, } r := batchMap[vertex.Gid] for _, ri := range r { - ri.Vertex = vertex + ri.Vertex = gdbi.NewElementFromVertex(vertex) o <- ri } } @@ -560,7 +560,7 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup, } r := batchMap[edge.To] for _, ri := range r { - ri.Vertex = &gripql.Vertex{Gid: edge.From} + ri.Vertex = &gdbi.Vertex{ID: edge.From} b = append(b, ri) } } @@ -577,8 +577,8 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup, idBatch := make([]string, len(batch)) batchMap := make(map[string][]gdbi.ElementLookup, len(batch)) for i := range batch { - idBatch[i] = batch[i].Vertex.Gid - batchMap[batch[i].Vertex.Gid] = append(batchMap[batch[i].Vertex.Gid], batch[i]) + idBatch[i] = batch[i].Vertex.ID + batchMap[batch[i].Vertex.ID] = append(batchMap[batch[i].Vertex.ID], batch[i]) } q := es.client.Search().Index(es.vertexIndex) q = q.Query(elastic.NewBoolQuery().Must(elastic.NewIdsQuery().Ids(idBatch...))) @@ -596,7 +596,7 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup, } r := batchMap[vertex.Gid] for _, ri := range r { - ri.Vertex = vertex + ri.Vertex = gdbi.NewElementFromVertex(vertex) o <- ri } } @@ -671,7 +671,7 @@ func (es *Graph) GetOutEdgeChannel(ctx context.Context, req chan gdbi.ElementLoo } r := batchMap[edge.From] for _, ri := range r { - ri.Edge = edge + ri.Edge = gdbi.NewElementFromEdge(edge) o <- ri } } @@ -746,7 +746,7 @@ func (es *Graph) GetInEdgeChannel(ctx context.Context, req chan gdbi.ElementLook } r := batchMap[edge.To] for _, ri := range r { - ri.Edge = edge + ri.Edge = gdbi.NewElementFromEdge(edge) o <- ri } } diff --git a/engine/core/processors.go b/engine/core/processors.go index 6257d0c7..27b7d648 100644 --- a/engine/core/processors.go +++ b/engine/core/processors.go @@ -35,9 +35,10 @@ func (l *LookupVerts) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP if len(l.ids) == 0 { for v := range l.db.GetVertexList(ctx, l.loadData) { out <- t.AddCurrent(&gdbi.DataElement{ - ID: v.Gid, - Label: v.Label, - Data: v.Data.AsMap(), + ID: v.ID, + Label: v.Label, + Data: v.Data, + Loaded: l.loadData, }) } } else { @@ -45,9 +46,10 @@ func (l *LookupVerts) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP v := l.db.GetVertex(i, l.loadData) if v != nil { out <- t.AddCurrent(&gdbi.DataElement{ - ID: v.Gid, - Label: v.Label, - Data: v.Data.AsMap(), + ID: v.ID, + Label: v.Label, + Data: v.Data, + Loaded: l.loadData, }) } } @@ -88,9 +90,10 @@ func (l *LookupVertsIndex) Process(ctx context.Context, man gdbi.Manager, in gdb for v := range l.db.GetVertexChannel(ctx, queryChan, l.loadData) { i := v.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: v.Vertex.Gid, - Label: v.Vertex.Label, - Data: v.Vertex.Data.AsMap(), + ID: v.Vertex.ID, + Label: v.Vertex.Label, + Data: v.Vertex.Data, + Loaded: v.Vertex.Loaded, }) } }() @@ -114,11 +117,12 @@ func (l *LookupEdges) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP if len(l.ids) == 0 { for v := range l.db.GetEdgeList(ctx, l.loadData) { out <- t.AddCurrent(&gdbi.DataElement{ - ID: v.Gid, - Label: v.Label, - From: v.From, - To: v.To, - Data: v.Data.AsMap(), + ID: v.ID, + Label: v.Label, + From: v.From, + To: v.To, + Data: v.Data, + Loaded: v.Loaded, }) } } else { @@ -126,11 +130,12 @@ func (l *LookupEdges) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP v := l.db.GetEdge(i, l.loadData) if v != nil { out <- t.AddCurrent(&gdbi.DataElement{ - ID: v.Gid, - Label: v.Label, - From: v.From, - To: v.To, - Data: v.Data.AsMap(), + ID: v.ID, + Label: v.Label, + From: v.From, + To: v.To, + Data: v.Data, + Loaded: v.Loaded, }) } } @@ -166,9 +171,10 @@ func (l *LookupVertexAdjOut) Process(ctx context.Context, man gdbi.Manager, in g for ov := range l.db.GetOutChannel(ctx, queryChan, l.loadData, l.labels) { i := ov.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: ov.Vertex.Gid, - Label: ov.Vertex.Label, - Data: ov.Vertex.Data.AsMap(), + ID: ov.Vertex.ID, + Label: ov.Vertex.Label, + Data: ov.Vertex.Data, + Loaded: ov.Vertex.Loaded, }) } }() @@ -201,9 +207,10 @@ func (l *LookupEdgeAdjOut) Process(ctx context.Context, man gdbi.Manager, in gdb for v := range l.db.GetVertexChannel(ctx, queryChan, l.loadData) { i := v.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: v.Vertex.Gid, - Label: v.Vertex.Label, - Data: v.Vertex.Data.AsMap(), + ID: v.Vertex.ID, + Label: v.Vertex.Label, + Data: v.Vertex.Data, + Loaded: v.Vertex.Loaded, }) } }() @@ -236,9 +243,10 @@ func (l *LookupVertexAdjIn) Process(ctx context.Context, man gdbi.Manager, in gd for v := range l.db.GetInChannel(ctx, queryChan, l.loadData, l.labels) { i := v.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: v.Vertex.Gid, - Label: v.Vertex.Label, - Data: v.Vertex.Data.AsMap(), + ID: v.Vertex.ID, + Label: v.Vertex.Label, + Data: v.Vertex.Data, + Loaded: v.Vertex.Loaded, }) } }() @@ -271,9 +279,10 @@ func (l *LookupEdgeAdjIn) Process(ctx context.Context, man gdbi.Manager, in gdbi for v := range l.db.GetVertexChannel(ctx, queryChan, l.loadData) { i := v.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: v.Vertex.Gid, - Label: v.Vertex.Label, - Data: v.Vertex.Data.AsMap(), + ID: v.Vertex.ID, + Label: v.Vertex.Label, + Data: v.Vertex.Data, + Loaded: v.Vertex.Loaded, }) } }() @@ -306,11 +315,12 @@ func (l *InE) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, out for v := range l.db.GetInEdgeChannel(ctx, queryChan, l.loadData, l.labels) { i := v.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: v.Edge.Gid, - To: v.Edge.To, - From: v.Edge.From, - Label: v.Edge.Label, - Data: v.Edge.Data.AsMap(), + ID: v.Edge.ID, + To: v.Edge.To, + From: v.Edge.From, + Label: v.Edge.Label, + Data: v.Edge.Data, + Loaded: v.Edge.Loaded, }) } }() @@ -343,11 +353,12 @@ func (l *OutE) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, ou for v := range l.db.GetOutEdgeChannel(ctx, queryChan, l.loadData, l.labels) { i := v.Ref out <- i.AddCurrent(&gdbi.DataElement{ - ID: v.Edge.Gid, - To: v.Edge.To, - From: v.Edge.From, - Label: v.Edge.Label, - Data: v.Edge.Data.AsMap(), + ID: v.Edge.ID, + To: v.Edge.To, + From: v.Edge.From, + Label: v.Edge.Label, + Data: v.Edge.Data, + Loaded: v.Edge.Loaded, }) } }() @@ -409,20 +420,20 @@ func (r *Unwind) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, cur := t.GetCurrent() if len(a) > 0 { for _, i := range a { - o := gdbi.DataElement{ID: cur.ID, Label: cur.Label, From: cur.From, To: cur.To, Data: util.DeepCopy(cur.Data).(map[string]interface{})} + o := gdbi.DataElement{ID: cur.ID, Label: cur.Label, From: cur.From, To: cur.To, Data: util.DeepCopy(cur.Data).(map[string]interface{}), Loaded: true} n := t.AddCurrent(&o) jsonpath.TravelerSetValue(n, r.Field, i) out <- n } } else { - o := gdbi.DataElement{ID: cur.ID, Label: cur.Label, From: cur.From, To: cur.To, Data: util.DeepCopy(cur.Data).(map[string]interface{})} + o := gdbi.DataElement{ID: cur.ID, Label: cur.Label, From: cur.From, To: cur.To, Data: util.DeepCopy(cur.Data).(map[string]interface{}), Loaded: true} n := t.AddCurrent(&o) jsonpath.TravelerSetValue(n, r.Field, nil) out <- n } } else { cur := t.GetCurrent() - o := gdbi.DataElement{ID: cur.ID, Label: cur.Label, From: cur.From, To: cur.To, Data: util.DeepCopy(cur.Data).(map[string]interface{})} + o := gdbi.DataElement{ID: cur.ID, Label: cur.Label, From: cur.From, To: cur.To, Data: util.DeepCopy(cur.Data).(map[string]interface{}), Loaded: true} n := t.AddCurrent(&o) jsonpath.TravelerSetValue(n, r.Field, nil) out <- n diff --git a/existing-sql/graph.go b/existing-sql/graph.go index a6fbaf89..4a87900d 100644 --- a/existing-sql/graph.go +++ b/existing-sql/graph.go @@ -32,16 +32,16 @@ func (g *Graph) Compiler() gdbi.Compiler { //////////////////////////////////////////////////////////////////////////////// // AddVertex is not implemented in the SQL driver -func (g *Graph) AddVertex(vertices []*gripql.Vertex) error { +func (g *Graph) AddVertex(vertices []*gdbi.Vertex) error { return errors.New("not implemented") } // AddEdge is not implemented in the SQL driver -func (g *Graph) AddEdge(edges []*gripql.Edge) error { +func (g *Graph) AddEdge(edges []*gdbi.Edge) error { return errors.New("not implemented") } -func (g *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (g *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error { return errors.New("not implemented") } @@ -66,7 +66,7 @@ func (g *Graph) GetTimestamp() string { // GetVertex loads a vertex given an id. It returns a nil if not found. // Keys are expected to be of the form: : -func (g *Graph) GetVertex(key string, load bool) *gripql.Vertex { +func (g *Graph) GetVertex(key string, load bool) *gdbi.Vertex { parts := strings.SplitN(key, ":", 2) if len(parts) != 2 { return nil @@ -88,7 +88,7 @@ func (g *Graph) GetVertex(key string, load bool) *gripql.Vertex { return nil } res := rowDataToVertex(g.schema.GetVertex(table), data, types, load) - return res + return gdbi.NewElementFromVertex(res) } func (g *Graph) getGeneratedEdge(key string, load bool) *gripql.Edge { @@ -126,21 +126,21 @@ func (g *Graph) getTableBackedEdge(key string, load bool) *gripql.Edge { // GetEdge loads an edge given an id. It returns nil if not found // Keys are expected to be of the form:
: -func (g *Graph) GetEdge(key string, load bool) *gripql.Edge { +func (g *Graph) GetEdge(key string, load bool) *gdbi.Edge { parts := strings.SplitN(key, ":", 2) if len(parts) != 2 { return nil } table := parts[0] if table == "generated" { - return g.getGeneratedEdge(key, load) + return gdbi.NewElementFromEdge(g.getGeneratedEdge(key, load)) } - return g.getTableBackedEdge(key, load) + return gdbi.NewElementFromEdge(g.getTableBackedEdge(key, load)) } // GetVertexList produces a channel of all vertices in the graph -func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex { - o := make(chan *gripql.Vertex, 100) +func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, 100) go func() { defer close(o) for _, v := range g.schema.Vertices { @@ -161,7 +161,7 @@ func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Ver log.WithFields(log.Fields{"error": err}).Error("GetVertexList: MapScan") return } - o <- rowDataToVertex(v, data, types, load) + o <- gdbi.NewElementFromVertex(rowDataToVertex(v, data, types, load)) } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexList: iterating") @@ -211,8 +211,8 @@ func (g *Graph) VertexLabelScan(ctx context.Context, label string) chan string { } // GetEdgeList produces a channel of all edges in the graph -func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge { - o := make(chan *gripql.Edge, 100) +func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) go func() { defer close(o) for _, edgeSchema := range g.schema.Edges { @@ -245,7 +245,7 @@ func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge } geid := &generatedEdgeID{edgeSchema.Label, edgeSchema.From.DestTable, fromGid, edgeSchema.To.DestTable, toGid} edge := geid.Edge() - o <- edge + o <- gdbi.NewElementFromEdge(edge) } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating") @@ -272,7 +272,7 @@ func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: MapScan") return } - o <- rowDataToEdge(edgeSchema, data, types, load) + o <- gdbi.NewElementFromEdge(rowDataToEdge(edgeSchema, data, types, load)) } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating") @@ -335,7 +335,7 @@ func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementL v := rowDataToVertex(g.schema.GetVertex(table), data, types, load) r := batchMap[v.Gid] for _, ri := range r { - ri.Vertex = v + ri.Vertex = gdbi.NewElementFromVertex(v) o <- ri } } @@ -446,7 +446,7 @@ func (g *Graph) GetOutChannel(ctx context.Context, reqChan chan gdbi.ElementLook } v := rowDataToVertex(g.schema.GetVertex(edgeSchema.To.DestTable), data, types, load) for _, ri := range r { - ri.Vertex = v + ri.Vertex = gdbi.NewElementFromVertex(v) o <- ri } } @@ -558,7 +558,7 @@ func (g *Graph) GetInChannel(ctx context.Context, reqChan chan gdbi.ElementLooku } v := rowDataToVertex(g.schema.GetVertex(edgeSchema.From.DestTable), data, types, load) for _, ri := range r { - ri.Vertex = v + ri.Vertex = gdbi.NewElementFromVertex(v) o <- ri } } @@ -638,7 +638,7 @@ func (g *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.Element edge := geid.Edge() r := batchMap[edge.From] for _, ri := range r { - ri.Edge = edge + ri.Edge = gdbi.NewElementFromEdge(edge) o <- ri } } @@ -668,7 +668,7 @@ func (g *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.Element edge := rowDataToEdge(edgeSchema, data, types, load) r := batchMap[fmt.Sprintf("%v:%v", edgeSchema.From.DestTable, data[edgeSchema.From.SourceField])] for _, ri := range r { - ri.Edge = edge + ri.Edge = gdbi.NewElementFromEdge(edge) o <- ri } } @@ -748,7 +748,7 @@ func (g *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementL edge := geid.Edge() r := batchMap[edge.To] for _, ri := range r { - ri.Edge = edge + ri.Edge = gdbi.NewElementFromEdge(edge) o <- ri } } @@ -778,7 +778,7 @@ func (g *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementL edge := rowDataToEdge(edgeSchema, data, types, load) r := batchMap[fmt.Sprintf("%v:%v", edgeSchema.To.DestTable, data[edgeSchema.To.SourceField])] for _, ri := range r { - ri.Edge = edge + ri.Edge = gdbi.NewElementFromEdge(edge) o <- ri } } diff --git a/gdbi/interface.go b/gdbi/interface.go index 18711fac..d4f48404 100644 --- a/gdbi/interface.go +++ b/gdbi/interface.go @@ -23,6 +23,17 @@ type DataElement struct { Label string From, To string Data map[string]interface{} + Loaded bool +} + +type Vertex = DataElement + +type Edge = DataElement + +type GraphElement struct { + Vertex *Vertex + Edge *Edge + Graph string } type Aggregate struct { @@ -59,8 +70,8 @@ const ( type ElementLookup struct { ID string Ref *Traveler - Vertex *gripql.Vertex - Edge *gripql.Edge + Vertex *Vertex + Edge *Edge } // GraphDB is the base interface for graph databases @@ -80,13 +91,13 @@ type GraphInterface interface { GetTimestamp() string - GetVertex(key string, load bool) *gripql.Vertex - GetEdge(key string, load bool) *gripql.Edge + GetVertex(key string, load bool) *Vertex + GetEdge(key string, load bool) *Edge - AddVertex(vertex []*gripql.Vertex) error - AddEdge(edge []*gripql.Edge) error + AddVertex(vertex []*Vertex) error + AddEdge(edge []*Edge) error - BulkAdd(<-chan *gripql.GraphElement) error + BulkAdd(<-chan *GraphElement) error DelVertex(key string) error DelEdge(key string) error @@ -100,8 +111,8 @@ type GraphInterface interface { DeleteVertexIndex(label string, field string) error GetVertexIndexList() <-chan *gripql.IndexID - GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex - GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge + GetVertexList(ctx context.Context, load bool) <-chan *Vertex + GetEdgeList(ctx context.Context, load bool) <-chan *Edge GetVertexChannel(ctx context.Context, req chan ElementLookup, load bool) chan ElementLookup GetOutChannel(ctx context.Context, req chan ElementLookup, load bool, edgeLabels []string) chan ElementLookup diff --git a/gdbi/traveler.go b/gdbi/traveler.go index 6e9277b0..6e445033 100644 --- a/gdbi/traveler.go +++ b/gdbi/traveler.go @@ -1,6 +1,7 @@ package gdbi import ( + "errors" "fmt" "github.com/bmeg/grip/gripql" @@ -69,6 +70,26 @@ func (t *Traveler) GetCurrent() *DataElement { return t.Current } +func NewElementFromVertex(v *gripql.Vertex) *Vertex { + return &Vertex{ + ID: v.Gid, + Label: v.Label, + Data: v.Data.AsMap(), + Loaded: true, + } +} + +func NewElementFromEdge(e *gripql.Edge) *Edge { + return &Edge{ + ID: e.Gid, + Label: e.Label, + To: e.To, + From: e.From, + Data: e.Data.AsMap(), + Loaded: true, + } +} + // ToVertex converts data element to vertex func (elem *DataElement) ToVertex() *gripql.Vertex { sValue, err := structpb.NewStruct(elem.Data) @@ -121,3 +142,31 @@ func (elem *DataElement) ToDict() map[string]interface{} { out["data"] = elem.Data return out } + +// Validate returns an error if the vertex is invalid +func (vertex *Vertex) Validate() error { + if vertex.ID == "" { + return errors.New("'gid' cannot be blank") + } + if vertex.Label == "" { + return errors.New("'label' cannot be blank") + } + for k := range vertex.Data { + err := gripql.ValidateFieldName(k) + if err != nil { + return err + } + } + return nil +} + +func NewGraphElement(g *gripql.GraphElement) *GraphElement { + o := GraphElement{Graph: g.Graph} + if g.Vertex != nil { + o.Vertex = NewElementFromVertex(g.Vertex) + } + if g.Edge != nil { + o.Edge = NewElementFromEdge(g.Edge) + } + return &o +} diff --git a/grids/graph.go b/grids/graph.go index 1e46a30e..24d560a3 100644 --- a/grids/graph.go +++ b/grids/graph.go @@ -11,10 +11,9 @@ import ( "github.com/bmeg/grip/kvi" "github.com/bmeg/grip/kvindex" "github.com/bmeg/grip/log" + "github.com/bmeg/grip/util/protoutil" "github.com/bmeg/grip/util/setcmp" multierror "github.com/hashicorp/go-multierror" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/structpb" ) // GetTimestamp returns the update timestamp @@ -29,13 +28,13 @@ type kvAddData struct { doc map[string]interface{} } -func insertVertex(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, vertex *gripql.Vertex) error { - if vertex.Gid == "" { +func insertVertex(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, vertex *gdbi.Vertex) error { + if vertex.ID == "" { return fmt.Errorf("Inserting null key vertex") } - vertexKey, _ := keyMap.GetsertVertexKey(graphKey, vertex.Gid, vertex.Label) + vertexKey, _ := keyMap.GetsertVertexKey(graphKey, vertex.ID, vertex.Label) key := VertexKey(graphKey, vertexKey) - value, err := proto.Marshal(vertex.Data) + value, err := protoutil.StructMarshal(vertex.Data) if err != nil { return err } @@ -45,23 +44,23 @@ func insertVertex(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, vertex *g return nil } -func indexVertex(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, vertex *gripql.Vertex) error { +func indexVertex(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, vertex *gdbi.Vertex) error { doc := map[string]interface{}{graph: vertexIdxStruct(vertex)} - if err := idx.AddDocTx(tx, vertex.Gid, doc); err != nil { + if err := idx.AddDocTx(tx, vertex.ID, doc); err != nil { return fmt.Errorf("AddVertex Error %s", err) } return nil } -func insertEdge(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, edge *gripql.Edge) error { +func insertEdge(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, edge *gdbi.Edge) error { var err error var data []byte - if edge.Gid == "" { + if edge.ID == "" { return fmt.Errorf("Inserting null key edge") } - eid, lid := keyMap.GetsertEdgeKey(graphKey, edge.Gid, edge.Label) + eid, lid := keyMap.GetsertEdgeKey(graphKey, edge.ID, edge.Label) src, ok := keyMap.GetVertexKey(graphKey, edge.From) if !ok { return fmt.Errorf("Vertex %s not found", edge.From) @@ -75,7 +74,7 @@ func insertEdge(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, edge *gripq skey := SrcEdgeKey(graphKey, eid, src, dst, lid) dkey := DstEdgeKey(graphKey, eid, src, dst, lid) - data, err = proto.Marshal(edge.Data) + data, err = protoutil.StructMarshal(edge.Data) if err != nil { return err } @@ -95,14 +94,14 @@ func insertEdge(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, edge *gripq return nil } -func indexEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gripql.Edge) error { - err := idx.AddDocTx(tx, edge.Gid, map[string]interface{}{graph: edgeIdxStruct(edge)}) +func indexEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gdbi.Edge) error { + err := idx.AddDocTx(tx, edge.ID, map[string]interface{}{graph: edgeIdxStruct(edge)}) return err } // AddVertex adds an edge to the graph, if it already exists // in the graph, it is replaced -func (ggraph *Graph) AddVertex(vertices []*gripql.Vertex) error { +func (ggraph *Graph) AddVertex(vertices []*gdbi.Vertex) error { err := ggraph.kdb.graphkv.BulkWrite(func(tx kvi.KVBulkWrite) error { var bulkErr *multierror.Error for _, vert := range vertices { @@ -130,7 +129,7 @@ func (ggraph *Graph) AddVertex(vertices []*gripql.Vertex) error { // AddEdge adds an edge to the graph, if the id is not "" and in already exists // in the graph, it is replaced -func (ggraph *Graph) AddEdge(edges []*gripql.Edge) error { +func (ggraph *Graph) AddEdge(edges []*gdbi.Edge) error { err := ggraph.kdb.graphkv.BulkWrite(func(tx kvi.KVBulkWrite) error { for _, edge := range edges { err := insertEdge(tx, ggraph.kdb.keyMap, ggraph.graphKey, edge) @@ -158,10 +157,10 @@ func (ggraph *Graph) AddEdge(edges []*gripql.Edge) error { } -func (ggraph *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (ggraph *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error { var anyErr error - insertStream := make(chan *gripql.GraphElement, 100) - indexStream := make(chan *gripql.GraphElement, 100) + insertStream := make(chan *gdbi.GraphElement, 100) + indexStream := make(chan *gdbi.GraphElement, 100) s := &sync.WaitGroup{} s.Add(2) go func() { @@ -326,8 +325,8 @@ func (ggraph *Graph) DelVertex(id string) error { } // GetEdgeList produces a channel of all edges in the graph -func (ggraph *Graph) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gripql.Edge { - o := make(chan *gripql.Edge, 100) +func (ggraph *Graph) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) go func() { defer close(o) ggraph.kdb.graphkv.View(func(it kvi.KVIterator) error { @@ -344,11 +343,12 @@ func (ggraph *Graph) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gri sid, _ := ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, skey) did, _ := ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, dkey) eid, _ := ggraph.kdb.keyMap.GetEdgeID(ggraph.graphKey, ekey) - e := &gripql.Edge{Gid: eid, Label: labelID, From: sid, To: did} + e := &gdbi.Edge{ID: eid, Label: labelID, From: sid, To: did} if loadProp { + var err error edgeData, _ := it.Value() - e.Data, _ = structpb.NewStruct(map[string]interface{}{}) - err := proto.Unmarshal(edgeData, e.Data) + e.Data, err = protoutil.StructUnMarshal(edgeData) + e.Loaded = true if err != nil { log.Errorf("GetEdgeList: unmarshal error: %v", err) continue @@ -363,25 +363,25 @@ func (ggraph *Graph) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gri } // GetVertex loads a vertex given an id. It returns a nil if not found -func (ggraph *Graph) GetVertex(id string, loadProp bool) *gripql.Vertex { +func (ggraph *Graph) GetVertex(id string, loadProp bool) *gdbi.Vertex { key, ok := ggraph.kdb.keyMap.GetVertexKey(ggraph.graphKey, id) if !ok { return nil } vkey := VertexKey(ggraph.graphKey, key) - var v *gripql.Vertex + var v *gdbi.Vertex err := ggraph.kdb.graphkv.View(func(it kvi.KVIterator) error { lKey := ggraph.kdb.keyMap.GetVertexLabel(ggraph.graphKey, key) lID, _ := ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, lKey) - v = &gripql.Vertex{ - Gid: id, + v = &gdbi.Vertex{ + ID: id, Label: lID, } if loadProp { dataValue, err := it.Get(vkey) - v.Data, _ = structpb.NewStruct(map[string]interface{}{}) - err = proto.Unmarshal(dataValue, v.Data) + v.Data, err = protoutil.StructUnMarshal(dataValue) + v.Loaded = true if err != nil { return fmt.Errorf("unmarshal error: %v", err) } @@ -429,14 +429,15 @@ func (ggraph *Graph) GetVertexChannel(ctx context.Context, ids chan gdbi.Element for d := range data { lKey := ggraph.kdb.keyMap.GetVertexLabel(ggraph.graphKey, d.key) lID, _ := ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, lKey) - v := gripql.Vertex{Gid: d.req.ID, Label: lID} + v := gdbi.Vertex{ID: d.req.ID, Label: lID} if load { - v.Data, _ = structpb.NewStruct(map[string]interface{}{}) - err := proto.Unmarshal(d.data, v.Data) + var err error + v.Data, err = protoutil.StructUnMarshal(d.data) if err != nil { log.Errorf("GetVertexChannel: unmarshal error: %v", err) continue } + v.Loaded = true } d.req.Vertex = &v out <- d.req @@ -489,16 +490,16 @@ func (ggraph *Graph) GetOutChannel(ctx context.Context, reqChan chan gdbi.Elemen gid, _ := ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, vkey) lkey := ggraph.kdb.keyMap.GetVertexLabel(ggraph.graphKey, vkey) lid, _ := ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, lkey) - v := &gripql.Vertex{Gid: gid, Label: lid} + v := &gdbi.Vertex{ID: gid, Label: lid} if load { dataValue, err := it.Get(req.data) if err == nil { - v.Data, _ = structpb.NewStruct(map[string]interface{}{}) - err = proto.Unmarshal(dataValue, v.Data) + v.Data, err = protoutil.StructUnMarshal(dataValue) if err != nil { log.Errorf("GetOutChannel: unmarshal error: %v", err) continue } + v.Loaded = true } } req.req.Vertex = v @@ -535,16 +536,16 @@ func (ggraph *Graph) GetInChannel(ctx context.Context, reqChan chan gdbi.Element srcID, _ := ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, src) lID := ggraph.kdb.keyMap.GetVertexLabel(ggraph.graphKey, src) lKey, _ := ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, lID) - v := &gripql.Vertex{Gid: srcID, Label: lKey} + v := &gdbi.Vertex{ID: srcID, Label: lKey} if load { dataValue, err := it.Get(vkey) if err == nil { - v.Data, _ = structpb.NewStruct(map[string]interface{}{}) - err = proto.Unmarshal(dataValue, v.Data) + v.Data, err = protoutil.StructUnMarshal(dataValue) if err != nil { log.Errorf("GetInChannel: unmarshal error: %v", err) continue } + v.Loaded = true } } req.Vertex = v @@ -580,8 +581,8 @@ func (ggraph *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.El keyValue := it.Key() _, eid, src, dst, label := SrcEdgeKeyParse(keyValue) if len(edgeLabelKeys) == 0 || setcmp.ContainsUint(edgeLabelKeys, label) { - e := gripql.Edge{} - e.Gid, _ = ggraph.kdb.keyMap.GetEdgeID(ggraph.graphKey, eid) + e := gdbi.Edge{} + e.ID, _ = ggraph.kdb.keyMap.GetEdgeID(ggraph.graphKey, eid) e.From, _ = ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, src) e.To, _ = ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, dst) e.Label, _ = ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, label) @@ -589,12 +590,12 @@ func (ggraph *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.El ekey := EdgeKey(ggraph.graphKey, eid, src, dst, label) dataValue, err := it.Get(ekey) if err == nil { - e.Data, _ = structpb.NewStruct(map[string]interface{}{}) - err := proto.Unmarshal(dataValue, e.Data) + e.Data, err = protoutil.StructUnMarshal(dataValue) if err != nil { log.Errorf("GetOutEdgeChannel: unmarshal error: %v", err) continue } + e.Loaded = true } } req.Edge = &e @@ -631,8 +632,8 @@ func (ggraph *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.Ele keyValue := it.Key() _, eid, src, dst, label := DstEdgeKeyParse(keyValue) if len(edgeLabelKeys) == 0 || setcmp.ContainsUint(edgeLabelKeys, label) { - e := gripql.Edge{} - e.Gid, _ = ggraph.kdb.keyMap.GetEdgeID(ggraph.graphKey, eid) + e := gdbi.Edge{} + e.ID, _ = ggraph.kdb.keyMap.GetEdgeID(ggraph.graphKey, eid) e.From, _ = ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, src) e.To, _ = ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, dst) e.Label, _ = ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, label) @@ -640,12 +641,12 @@ func (ggraph *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.Ele ekey := EdgeKey(ggraph.graphKey, eid, src, dst, label) dataValue, err := it.Get(ekey) if err == nil { - e.Data = &structpb.Struct{} - err := proto.Unmarshal(dataValue, e.Data) + e.Data, err = protoutil.StructUnMarshal(dataValue) if err != nil { log.Errorf("GetInEdgeChannel: unmarshal error: %v", err) continue } + e.Loaded = true } } req.Edge = &e @@ -662,14 +663,14 @@ func (ggraph *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.Ele } // GetEdge loads an edge given an id. It returns nil if not found -func (ggraph *Graph) GetEdge(id string, loadProp bool) *gripql.Edge { +func (ggraph *Graph) GetEdge(id string, loadProp bool) *gdbi.Edge { ekey, ok := ggraph.kdb.keyMap.GetEdgeKey(ggraph.graphKey, id) if !ok { return nil } ekeyPrefix := EdgeKeyPrefix(ggraph.graphKey, ekey) - var e *gripql.Edge + var e *gdbi.Edge err := ggraph.kdb.graphkv.View(func(it kvi.KVIterator) error { for it.Seek(ekeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), ekeyPrefix); it.Next() { _, eid, src, dst, labelKey := EdgeKeyParse(it.Key()) @@ -677,19 +678,20 @@ func (ggraph *Graph) GetEdge(id string, loadProp bool) *gripql.Edge { from, _ := ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, src) to, _ := ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, dst) label, _ := ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, labelKey) - e = &gripql.Edge{ - Gid: gid, + e = &gdbi.Edge{ + ID: gid, From: from, To: to, Label: label, } if loadProp { + var err error d, _ := it.Value() - e.Data = &structpb.Struct{} - err := proto.Unmarshal(d, e.Data) + e.Data, err = protoutil.StructUnMarshal(d) if err != nil { return fmt.Errorf("unmarshal error: %v", err) } + e.Loaded = true } } return nil @@ -701,8 +703,8 @@ func (ggraph *Graph) GetEdge(id string, loadProp bool) *gripql.Edge { } // GetVertexList produces a channel of all edges in the graph -func (ggraph *Graph) GetVertexList(ctx context.Context, loadProp bool) <-chan *gripql.Vertex { - o := make(chan *gripql.Vertex, 100) +func (ggraph *Graph) GetVertexList(ctx context.Context, loadProp bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, 100) go func() { defer close(o) ggraph.kdb.graphkv.View(func(it kvi.KVIterator) error { @@ -714,20 +716,21 @@ func (ggraph *Graph) GetVertexList(ctx context.Context, loadProp bool) <-chan *g return nil default: } - v := &gripql.Vertex{} + v := &gdbi.Vertex{} keyValue := it.Key() _, vKey := VertexKeyParse(keyValue) lKey := ggraph.kdb.keyMap.GetVertexLabel(ggraph.graphKey, vKey) - v.Gid, _ = ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, vKey) + v.ID, _ = ggraph.kdb.keyMap.GetVertexID(ggraph.graphKey, vKey) v.Label, _ = ggraph.kdb.keyMap.GetLabelID(ggraph.graphKey, lKey) if loadProp { + var err error dataValue, _ := it.Value() - v.Data = &structpb.Struct{} - err := proto.Unmarshal(dataValue, v.Data) + v.Data, err = protoutil.StructUnMarshal(dataValue) if err != nil { log.Errorf("GetVertexList: unmarshal error: %v", err) continue } + v.Loaded = true } o <- v } diff --git a/grids/index.go b/grids/index.go index b8952b2a..983fc8c9 100644 --- a/grids/index.go +++ b/grids/index.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/jsonpath" "github.com/bmeg/grip/log" @@ -43,21 +44,21 @@ func normalizePath(path string) string { return path } -func vertexIdxStruct(v *gripql.Vertex) map[string]interface{} { +func vertexIdxStruct(v *gdbi.Vertex) map[string]interface{} { k := map[string]interface{}{ "v": map[string]interface{}{ "label": v.Label, - v.Label: v.Data.AsMap(), + v.Label: v.Data, }, } return k } -func edgeIdxStruct(e *gripql.Edge) map[string]interface{} { +func edgeIdxStruct(e *gdbi.Edge) map[string]interface{} { k := map[string]interface{}{ "e": map[string]interface{}{ "label": e.Label, - e.Label: e.Data.AsMap(), + e.Label: e.Data, }, } return k diff --git a/grids/schema.go b/grids/schema.go index 1895dd6c..10fa9d52 100644 --- a/grids/schema.go +++ b/grids/schema.go @@ -46,7 +46,7 @@ func (ma *GDB) sampleSchema(ctx context.Context, graph string, n uint32, random schema := map[string]interface{}{} for i := range ma.idx.GetTermMatch(context.Background(), labelField, label, int(n)) { v := gi.GetVertex(i, true) - data := v.Data.AsMap() + data := v.Data ds := gripql.GetDataFieldTypes(data) util.MergeMaps(schema, ds) @@ -56,7 +56,7 @@ func (ma *GDB) sampleSchema(ctx context.Context, graph string, n uint32, random for e := range gi.GetOutEdgeChannel(ctx, reqChan, true, []string{}) { o := gi.GetVertex(e.Edge.To, false) k := fromtokey{from: v.Label, to: o.Label, label: e.Edge.Label} - ds := gripql.GetDataFieldTypes(e.Edge.Data.AsMap()) + ds := gripql.GetDataFieldTypes(e.Edge.Data) if p, ok := fromToPairs[k]; ok { fromToPairs[k] = util.MergeMaps(p, ds) } else { diff --git a/gripper/graph.go b/gripper/graph.go index 9edf7742..1c6531e2 100644 --- a/gripper/graph.go +++ b/gripper/graph.go @@ -204,15 +204,15 @@ func (t *TabularGraph) Close() error { return nil } -func (t *TabularGraph) AddVertex(vertex []*gripql.Vertex) error { +func (t *TabularGraph) AddVertex(vertex []*gdbi.Vertex) error { return fmt.Errorf("GRIPPER Graph is ReadOnly") } -func (t *TabularGraph) AddEdge(edge []*gripql.Edge) error { +func (t *TabularGraph) AddEdge(edge []*gdbi.Edge) error { return fmt.Errorf("GRIPPER is ReadOnly") } -func (t *TabularGraph) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (t *TabularGraph) BulkAdd(stream <-chan *gdbi.GraphElement) error { return fmt.Errorf("GRIPPER is ReadOnly") } @@ -240,7 +240,7 @@ func (t *TabularGraph) getRow(source, collection, id string) *Row { } -func (t *TabularGraph) GetVertex(key string, load bool) *gripql.Vertex { +func (t *TabularGraph) GetVertex(key string, load bool) *gdbi.Vertex { for _, source := range t.vertexSourceOrder { v := t.vertices[source] if strings.HasPrefix(key, v.prefix) { @@ -254,7 +254,7 @@ func (t *TabularGraph) GetVertex(key string, load bool) *gripql.Vertex { row = i } if row != nil { - o := gripql.Vertex{Gid: v.prefix + row.Id, Label: v.config.Label, Data: row.Data} + o := gdbi.Vertex{ID: v.prefix + row.Id, Label: v.config.Label, Data: row.Data.AsMap(), Loaded: true} return &o } } else { @@ -265,7 +265,7 @@ func (t *TabularGraph) GetVertex(key string, load bool) *gripql.Vertex { return nil } -func (t *TabularGraph) GetEdge(key string, load bool) *gripql.Edge { +func (t *TabularGraph) GetEdge(key string, load bool) *gdbi.Edge { src, dst, label, err := t.ParseEdge(key) if err != nil { return nil @@ -285,18 +285,19 @@ func (t *TabularGraph) GetEdge(key string, load bool) *gripql.Edge { edge.config.EdgeTable.FromField, srcID) if err == nil { - var out *gripql.Edge + var out *gdbi.Edge for row := range res { data := row.Data.AsMap() if rowDst, err := jsonpath.JsonPathLookup(data, edge.config.EdgeTable.ToField); err == nil { if rowdDstStr, ok := rowDst.(string); ok { if dstID == rowdDstStr { - o := gripql.Edge{ - Gid: edge.GenID(srcID, dstID), //edge.prefix + row.Id, - To: edge.config.ToVertex + dstID, - From: edge.config.FromVertex + srcID, - Label: edge.config.Label, - Data: row.Data, + o := gdbi.Edge{ + ID: edge.GenID(srcID, dstID), //edge.prefix + row.Id, + To: edge.config.ToVertex + dstID, + From: edge.config.FromVertex + srcID, + Label: edge.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } out = &o } @@ -321,11 +322,12 @@ func (t *TabularGraph) GetEdge(key string, load bool) *gripql.Edge { if srcField, err := jsonpath.JsonPathLookup(srcData, edge.config.FieldToField.FromField); err == nil { if dstField, err := jsonpath.JsonPathLookup(dstData, edge.config.FieldToField.ToField); err == nil { if srcField == dstField { - o := gripql.Edge{ - Gid: edge.GenID(srcID, dstID), //edge.prefix + row.Id, - To: edge.config.ToVertex + dstID, - From: edge.config.FromVertex + srcID, - Label: edge.config.Label, + o := gdbi.Edge{ + ID: edge.GenID(srcID, dstID), //edge.prefix + row.Id, + To: edge.config.ToVertex + dstID, + From: edge.config.FromVertex + srcID, + Label: edge.config.Label, + Loaded: true, } return &o } @@ -408,14 +410,19 @@ func (t *TabularGraph) GetVertexIndexList() <-chan *gripql.IndexID { return out } -func (t *TabularGraph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex { - out := make(chan *gripql.Vertex, 100) +func (t *TabularGraph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex { + out := make(chan *gdbi.Vertex, 100) go func() { for _, source := range t.vertexSourceOrder { c := t.vertices[source] //log.Infof("Getting vertices from table: %s", c.config.Label) for row := range t.client.GetRows(ctx, c.config.Source, c.config.Collection) { - v := gripql.Vertex{Gid: c.prefix + row.Id, Label: c.config.Label, Data: row.Data} + v := gdbi.Vertex{ + ID: c.prefix + row.Id, + Label: c.config.Label, + Data: row.Data.AsMap(), + Loaded: true, + } out <- &v } } @@ -424,8 +431,8 @@ func (t *TabularGraph) GetVertexList(ctx context.Context, load bool) <-chan *gri return out } -func (t *TabularGraph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge { - out := make(chan *gripql.Edge, 100) +func (t *TabularGraph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { + out := make(chan *gdbi.Edge, 100) go func() { defer close(out) for _, source := range t.edgeSourceOrder { @@ -444,12 +451,13 @@ func (t *TabularGraph) GetEdgeList(ctx context.Context, load bool) <-chan *gripq if dstStr, ok := dst.(string); ok { if src, err := jsonpath.JsonPathLookup(data, edge.config.EdgeTable.FromField); err == nil { if srcStr, ok := src.(string); ok { - e := gripql.Edge{ - Gid: edge.GenID(srcStr, dstStr), - To: edge.toVertex.prefix + dstStr, - From: edge.fromVertex.prefix + srcStr, - Label: edge.config.Label, - Data: row.Data, + e := gdbi.Edge{ + ID: edge.GenID(srcStr, dstStr), + To: edge.toVertex.prefix + dstStr, + From: edge.fromVertex.prefix + srcStr, + Label: edge.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } out <- &e } @@ -474,11 +482,12 @@ func (t *TabularGraph) GetEdgeList(ctx context.Context, load bool) <-chan *gripq edge.config.FieldToField.ToField, fValue) if err == nil { for dstRow := range dstRes { - o := gripql.Edge{ - Gid: edge.GenID(srcRow.Id, dstRow.Id), - From: edge.fromVertex.prefix + srcRow.Id, - To: edge.toVertex.prefix + dstRow.Id, - Label: edge.config.Label, + o := gdbi.Edge{ + ID: edge.GenID(srcRow.Id, dstRow.Id), + From: edge.fromVertex.prefix + srcRow.Id, + To: edge.toVertex.prefix + dstRow.Id, + Label: edge.config.Label, + Loaded: true, } out <- &o } @@ -524,8 +533,7 @@ func rowRequestVertexPipeline(ctx context.Context, prefix string, go func() { defer close(out) for r := range rowChan { - o := gripql.Vertex{Gid: prefix + r.Id, Label: label} - o.Data = r.Data + o := gdbi.Vertex{ID: prefix + r.Id, Label: label, Data: r.Data.AsMap(), Loaded: true} reqSync.Lock() outReq := reqMap[r.RequestID] delete(reqMap, r.RequestID) @@ -646,7 +654,7 @@ func (t *TabularGraph) GetOutChannel(ctx context.Context, req chan gdbi.ElementL //log.Infof("Searching %s : %s == %s", edge.toVertex.config.Collection, edge.config.FieldToField.ToField, fValue ) for row := range res { //log.Infof("Found %#v", row) - o := gripql.Vertex{Gid: edge.toVertex.prefix + row.Id, Label: edge.toVertex.config.Label, Data: row.Data} + o := gdbi.Vertex{ID: edge.toVertex.prefix + row.Id, Label: edge.toVertex.config.Label, Data: row.Data.AsMap(), Loaded: true} el := gdbi.ElementLookup{ID: r.ID, Ref: r.Ref, Vertex: &o} out <- el } @@ -728,10 +736,11 @@ func (t *TabularGraph) GetInChannel(ctx context.Context, req chan gdbi.ElementLo edge.config.FieldToField.ToField, fValue) if err == nil { for row := range res { - o := gripql.Vertex{ - Gid: edge.toVertex.prefix + row.Id, - Label: edge.toVertex.config.Label, - Data: row.Data, + o := gdbi.Vertex{ + ID: edge.toVertex.prefix + row.Id, + Label: edge.toVertex.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } el := gdbi.ElementLookup{ID: r.ID, Ref: r.Ref, Vertex: &o} out <- el @@ -782,12 +791,13 @@ func (t *TabularGraph) GetOutEdgeChannel(ctx context.Context, req chan gdbi.Elem data := row.Data.AsMap() if dst, err := jsonpath.JsonPathLookup(data, edge.config.EdgeTable.ToField); err == nil { if dstStr, ok := dst.(string); ok { - o := gripql.Edge{ - Gid: edge.GenID(id, dstStr), - From: edge.config.FromVertex + id, - To: edge.config.ToVertex + dstStr, - Label: edge.config.Label, - Data: row.Data, + o := gdbi.Edge{ + ID: edge.GenID(id, dstStr), + From: edge.config.FromVertex + id, + To: edge.config.ToVertex + dstStr, + Label: edge.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } out <- gdbi.ElementLookup{Ref: r.Ref, Edge: &o} } @@ -821,12 +831,13 @@ func (t *TabularGraph) GetOutEdgeChannel(ctx context.Context, req chan gdbi.Elem edge.config.FieldToField.ToField, fValue) if err == nil { for row := range res { - o := gripql.Edge{ - Gid: edge.GenID(id, row.Id), - From: edge.fromVertex.prefix + id, - To: edge.toVertex.prefix + row.Id, - Label: edge.config.Label, - Data: row.Data, + o := gdbi.Edge{ + ID: edge.GenID(id, row.Id), + From: edge.fromVertex.prefix + id, + To: edge.toVertex.prefix + row.Id, + Label: edge.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } el := gdbi.ElementLookup{ID: r.ID, Ref: r.Ref, Edge: &o} out <- el @@ -878,12 +889,13 @@ func (t *TabularGraph) GetInEdgeChannel(ctx context.Context, req chan gdbi.Eleme data := row.Data.AsMap() if dst, err := jsonpath.JsonPathLookup(data, edge.config.EdgeTable.ToField); err == nil { if dstStr, ok := dst.(string); ok { - o := gripql.Edge{ - Gid: edge.GenID(dstStr, id), - From: edge.toVertex.prefix + dstStr, - To: edge.fromVertex.prefix + id, - Label: edge.config.Label, - Data: row.Data, + o := gdbi.Edge{ + ID: edge.GenID(dstStr, id), + From: edge.toVertex.prefix + dstStr, + To: edge.fromVertex.prefix + id, + Label: edge.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } out <- gdbi.ElementLookup{Ref: r.Ref, Edge: &o} } @@ -917,12 +929,13 @@ func (t *TabularGraph) GetInEdgeChannel(ctx context.Context, req chan gdbi.Eleme edge.config.FieldToField.ToField, fValue) if err == nil { for row := range res { - o := gripql.Edge{ - Gid: edge.GenID(row.Id, id), - To: edge.fromVertex.prefix + id, //row.Id, - From: edge.toVertex.prefix + row.Id, //id, - Label: edge.config.Label, - Data: row.Data, + o := gdbi.Edge{ + ID: edge.GenID(row.Id, id), + To: edge.fromVertex.prefix + id, //row.Id, + From: edge.toVertex.prefix + row.Id, //id, + Label: edge.config.Label, + Data: row.Data.AsMap(), + Loaded: true, } el := gdbi.ElementLookup{ID: r.ID, Ref: r.Ref, Edge: &o} out <- el diff --git a/kvgraph/graph.go b/kvgraph/graph.go index d25c47eb..fe49f51f 100644 --- a/kvgraph/graph.go +++ b/kvgraph/graph.go @@ -44,11 +44,11 @@ type kvAddData struct { // AddVertex adds an edge to the graph, if it already exists // in the graph, it is replaced -func (kgdb *KVInterfaceGDB) AddVertex(vertices []*gripql.Vertex) error { +func (kgdb *KVInterfaceGDB) AddVertex(vertices []*gdbi.Vertex) error { err := kgdb.kvg.kv.BulkWrite(func(tx kvi.KVBulkWrite) error { var bulkErr *multierror.Error for _, vert := range vertices { - if err := insertVertex(tx, kgdb.kvg.idx, kgdb.graph, vert); err != nil { + if err := insertVertex(tx, kgdb.kvg.idx, kgdb.graph, vert.ToVertex()); err != nil { bulkErr = multierror.Append(bulkErr, err) } } @@ -119,11 +119,11 @@ func insertEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gr // AddEdge adds an edge to the graph, if the id is not "" and in already exists // in the graph, it is replaced -func (kgdb *KVInterfaceGDB) AddEdge(edges []*gripql.Edge) error { +func (kgdb *KVInterfaceGDB) AddEdge(edges []*gdbi.Edge) error { err := kgdb.kvg.kv.BulkWrite(func(tx kvi.KVBulkWrite) error { var bulkErr *multierror.Error for _, edge := range edges { - if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, edge); err != nil { + if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, edge.ToEdge()); err != nil { bulkErr = multierror.Append(bulkErr, err) } } @@ -133,18 +133,18 @@ func (kgdb *KVInterfaceGDB) AddEdge(edges []*gripql.Edge) error { return err } -func (kgdb *KVInterfaceGDB) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (kgdb *KVInterfaceGDB) BulkAdd(stream <-chan *gdbi.GraphElement) error { err := kgdb.kvg.kv.BulkWrite(func(tx kvi.KVBulkWrite) error { var bulkErr *multierror.Error for elem := range stream { if elem.Vertex != nil { - if err := insertVertex(tx, kgdb.kvg.idx, kgdb.graph, elem.Vertex); err != nil { + if err := insertVertex(tx, kgdb.kvg.idx, kgdb.graph, elem.Vertex.ToVertex()); err != nil { bulkErr = multierror.Append(bulkErr, err) } continue } if elem.Edge != nil { - if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, elem.Edge); err != nil { + if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, elem.Edge.ToEdge()); err != nil { bulkErr = multierror.Append(bulkErr, err) } continue @@ -229,8 +229,8 @@ func (kgdb *KVInterfaceGDB) DelVertex(id string) error { } // GetEdgeList produces a channel of all edges in the graph -func (kgdb *KVInterfaceGDB) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gripql.Edge { - o := make(chan *gripql.Edge, 100) +func (kgdb *KVInterfaceGDB) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) go func() { defer close(o) kgdb.kvg.kv.View(func(it kvi.KVIterator) error { @@ -246,11 +246,12 @@ func (kgdb *KVInterfaceGDB) GetEdgeList(ctx context.Context, loadProp bool) <-ch if etype == edgeSingle { if loadProp { edgeData, _ := it.Value() - e := &gripql.Edge{} - proto.Unmarshal(edgeData, e) + ge := &gripql.Edge{} + proto.Unmarshal(edgeData, ge) + e := &gdbi.Edge{ID: ge.Gid, Label: ge.Label, From: sid, To: did, Data: ge.Data.AsMap(), Loaded: true} o <- e } else { - e := &gripql.Edge{Gid: string(eid), Label: label, From: sid, To: did} + e := &gdbi.Edge{ID: string(eid), Label: label, From: sid, To: did, Loaded: false} o <- e } } @@ -262,22 +263,28 @@ func (kgdb *KVInterfaceGDB) GetEdgeList(ctx context.Context, loadProp bool) <-ch } // GetVertex loads a vertex given an id. It returns a nil if not found -func (kgdb *KVInterfaceGDB) GetVertex(id string, loadProp bool) *gripql.Vertex { +func (kgdb *KVInterfaceGDB) GetVertex(id string, loadProp bool) *gdbi.Vertex { vkey := VertexKey(kgdb.graph, id) - var v *gripql.Vertex + var v *gdbi.Vertex err := kgdb.kvg.kv.View(func(it kvi.KVIterator) error { dataValue, err := it.Get(vkey) if err != nil { return fmt.Errorf("get call failed: %v", err) } - v = &gripql.Vertex{ + gv := &gripql.Vertex{ Gid: id, } - err = proto.Unmarshal(dataValue, v) //FIXME: this can't be skipped because vertex label is in value... + err = proto.Unmarshal(dataValue, gv) //FIXME: this can't be skipped because vertex label is in value... if err != nil { return fmt.Errorf("unmarshal error: %v", err) } + v = &gdbi.Vertex{ + ID: id, + Label: gv.Label, + Data: gv.Data.AsMap(), + Loaded: true, + } return nil }) if err != nil { @@ -318,7 +325,12 @@ func (kgdb *KVInterfaceGDB) GetVertexChannel(ctx context.Context, ids chan gdbi. for d := range data { v := gripql.Vertex{} proto.Unmarshal(d.data, &v) - d.req.Vertex = &v + d.req.Vertex = &gdbi.Vertex{ + ID: d.req.ID, + Label: v.Label, + Data: v.Data.AsMap(), + Loaded: true, + } out <- d.req } }() @@ -368,7 +380,12 @@ func (kgdb *KVInterfaceGDB) GetOutChannel(ctx context.Context, reqChan chan gdbi continue //} } - req.req.Vertex = v + req.req.Vertex = &gdbi.Vertex{ + ID: req.req.ID, + Label: v.Label, + Data: v.Data.AsMap(), + Loaded: true, + } o <- req.req } } @@ -401,7 +418,12 @@ func (kgdb *KVInterfaceGDB) GetInChannel(ctx context.Context, reqChan chan gdbi. continue } //} - req.Vertex = v + req.Vertex = &gdbi.Vertex{ + ID: req.ID, + Label: v.Label, + Data: v.Data.AsMap(), + Loaded: true, + } o <- req } } @@ -426,18 +448,26 @@ func (kgdb *KVInterfaceGDB) GetOutEdgeChannel(ctx context.Context, reqChan chan _, src, dst, eid, label, edgeType := SrcEdgeKeyParse(keyValue) if len(edgeLabels) == 0 || contains(edgeLabels, label) { if edgeType == edgeSingle { - e := gripql.Edge{} + e := gdbi.Edge{} if load { ekey := EdgeKey(kgdb.graph, eid, src, dst, label, edgeType) dataValue, err := it.Get(ekey) + ge := gripql.Edge{} if err == nil { - proto.Unmarshal(dataValue, &e) + proto.Unmarshal(dataValue, &ge) + e.ID = string(eid) + e.From = string(src) + e.To = dst + e.Label = label + e.Data = ge.Data.AsMap() + e.Loaded = true } } else { - e.Gid = string(eid) + e.ID = string(eid) e.From = string(src) e.To = dst e.Label = label + e.Loaded = false } req.Edge = &e o <- req @@ -465,18 +495,26 @@ func (kgdb *KVInterfaceGDB) GetInEdgeChannel(ctx context.Context, reqChan chan g _, src, dst, eid, label, edgeType := DstEdgeKeyParse(keyValue) if len(edgeLabels) == 0 || contains(edgeLabels, label) { if edgeType == edgeSingle { - e := gripql.Edge{} + e := gdbi.Edge{} if load { ekey := EdgeKey(kgdb.graph, eid, src, dst, label, edgeType) dataValue, err := it.Get(ekey) if err == nil { - proto.Unmarshal(dataValue, &e) + ge := gripql.Edge{} + proto.Unmarshal(dataValue, &ge) + e.ID = string(eid) + e.From = string(src) + e.To = dst + e.Label = label + e.Data = ge.Data.AsMap() + e.Loaded = true } } else { - e.Gid = string(eid) + e.ID = string(eid) e.From = string(src) e.To = dst e.Label = label + e.Loaded = false } req.Edge = &e o <- req @@ -492,26 +530,35 @@ func (kgdb *KVInterfaceGDB) GetInEdgeChannel(ctx context.Context, reqChan chan g } // GetEdge loads an edge given an id. It returns nil if not found -func (kgdb *KVInterfaceGDB) GetEdge(id string, loadProp bool) *gripql.Edge { +func (kgdb *KVInterfaceGDB) GetEdge(id string, loadProp bool) *gdbi.Edge { ekeyPrefix := EdgeKeyPrefix(kgdb.graph, id) - var e *gripql.Edge + var e *gdbi.Edge err := kgdb.kvg.kv.View(func(it kvi.KVIterator) error { for it.Seek(ekeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), ekeyPrefix); it.Next() { _, eid, src, dst, label, _ := EdgeKeyParse(it.Key()) if loadProp { - e = &gripql.Edge{} d, _ := it.Value() - err := proto.Unmarshal(d, e) + ge := &gripql.Edge{} + err := proto.Unmarshal(d, ge) if err != nil { return fmt.Errorf("unmarshal error: %v", err) } + e = &gdbi.Edge{ + ID: eid, + From: src, + To: dst, + Label: label, + Data: ge.Data.AsMap(), + Loaded: true, + } } else { - e = &gripql.Edge{ - Gid: eid, - From: src, - To: dst, - Label: label, + e = &gdbi.Edge{ + ID: eid, + From: src, + To: dst, + Label: label, + Loaded: false, } } } @@ -520,13 +567,12 @@ func (kgdb *KVInterfaceGDB) GetEdge(id string, loadProp bool) *gripql.Edge { if err != nil { return nil } - return e } // GetVertexList produces a channel of all edges in the graph -func (kgdb *KVInterfaceGDB) GetVertexList(ctx context.Context, loadProp bool) <-chan *gripql.Vertex { - o := make(chan *gripql.Vertex, 100) +func (kgdb *KVInterfaceGDB) GetVertexList(ctx context.Context, loadProp bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, 100) go func() { defer close(o) kgdb.kvg.kv.View(func(it kvi.KVIterator) error { @@ -538,16 +584,17 @@ func (kgdb *KVInterfaceGDB) GetVertexList(ctx context.Context, loadProp bool) <- return nil default: } - v := &gripql.Vertex{} - if loadProp { - dataValue, _ := it.Value() - proto.Unmarshal(dataValue, v) - } else { - keyValue := it.Key() - _, vid := VertexKeyParse(keyValue) - v.Gid = string(vid) + gv := &gripql.Vertex{} + dataValue, _ := it.Value() + proto.Unmarshal(dataValue, gv) + keyValue := it.Key() + _, vid := VertexKeyParse(keyValue) + o <- &gdbi.Vertex{ + ID: string(vid), + Label: gv.Label, + Data: gv.Data.AsMap(), + Loaded: true, } - o <- v } return nil }) diff --git a/kvgraph/schema.go b/kvgraph/schema.go index 9d5f3b65..e4f04afe 100644 --- a/kvgraph/schema.go +++ b/kvgraph/schema.go @@ -46,7 +46,7 @@ func (ma *KVGraph) sampleSchema(ctx context.Context, graph string, n uint32, ran schema := map[string]interface{}{} for i := range ma.idx.GetTermMatch(context.Background(), labelField, label, int(n)) { v := gi.GetVertex(i, true) - data := v.Data.AsMap() + data := v.Data ds := gripql.GetDataFieldTypes(data) util.MergeMaps(schema, ds) @@ -56,7 +56,7 @@ func (ma *KVGraph) sampleSchema(ctx context.Context, graph string, n uint32, ran for e := range gi.GetOutEdgeChannel(ctx, reqChan, true, []string{}) { o := gi.GetVertex(e.Edge.To, false) k := fromtokey{from: v.Label, to: o.Label, label: e.Edge.Label} - ds := gripql.GetDataFieldTypes(e.Edge.Data.AsMap()) + ds := gripql.GetDataFieldTypes(e.Edge.Data) if p, ok := fromToPairs[k]; ok { fromToPairs[k] = util.MergeMaps(p, ds) } else { diff --git a/mongo/convert.go b/mongo/convert.go index 5cd03f31..95ea6830 100644 --- a/mongo/convert.go +++ b/mongo/convert.go @@ -1,33 +1,32 @@ package mongo import ( - "github.com/bmeg/grip/gripql" - "google.golang.org/protobuf/types/known/structpb" - + "github.com/bmeg/grip/gdbi" "go.mongodb.org/mongo-driver/bson/primitive" + "google.golang.org/protobuf/types/known/structpb" ) // PackVertex take a GRIP vertex and convert it to a mongo doc -func PackVertex(v *gripql.Vertex) map[string]interface{} { +func PackVertex(v *gdbi.Vertex) map[string]interface{} { p := map[string]interface{}{} if v.Data != nil { - p = v.Data.AsMap() + p = v.Data } return map[string]interface{}{ - "_id": v.Gid, + "_id": v.ID, "label": v.Label, "data": p, } } // PackEdge takes a GRIP edge and converts it to a mongo doc -func PackEdge(e *gripql.Edge) map[string]interface{} { +func PackEdge(e *gdbi.Edge) map[string]interface{} { p := map[string]interface{}{} if e.Data != nil { - p = e.Data.AsMap() + p = e.Data } return map[string]interface{}{ - "_id": e.Gid, + "_id": e.ID, "from": e.From, "to": e.To, "label": e.Label, @@ -42,31 +41,33 @@ type pair struct { } // UnpackVertex takes a mongo doc and converts it into an gripql.Vertex -func UnpackVertex(i map[string]interface{}) *gripql.Vertex { - o := &gripql.Vertex{} - o.Gid = i["_id"].(string) +func UnpackVertex(i map[string]interface{}) *gdbi.Vertex { + o := &gdbi.Vertex{} + o.ID = i["_id"].(string) o.Label = i["label"].(string) if d, ok := i["data"]; ok { d = removePrimatives(d) - o.Data, _ = structpb.NewStruct(d.(map[string]interface{})) + o.Data = d.(map[string]interface{}) + o.Loaded = true } else { - o.Data, _ = structpb.NewStruct(map[string]interface{}{}) + o.Loaded = false } return o } // UnpackEdge takes a mongo doc and convertes it into an gripql.Edge -func UnpackEdge(i map[string]interface{}) *gripql.Edge { - o := &gripql.Edge{} +func UnpackEdge(i map[string]interface{}) *gdbi.Edge { + o := &gdbi.Edge{} id := i["_id"] - o.Gid = id.(string) + o.ID = id.(string) o.Label = i["label"].(string) o.From = i["from"].(string) o.To = i["to"].(string) if d, ok := i["data"]; ok { - o.Data, _ = structpb.NewStruct(d.(map[string]interface{})) + o.Data = d.(map[string]interface{}) + o.Loaded = true } else { - o.Data, _ = structpb.NewStruct(map[string]interface{}{}) + o.Loaded = false } return o } diff --git a/mongo/graph.go b/mongo/graph.go index 84fc55e7..ac1fb17b 100644 --- a/mongo/graph.go +++ b/mongo/graph.go @@ -9,7 +9,6 @@ import ( "github.com/bmeg/grip/engine/core" "github.com/bmeg/grip/gdbi" - "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" "github.com/bmeg/grip/timestamp" "github.com/bmeg/grip/util" @@ -41,7 +40,7 @@ func (mg *Graph) GetTimestamp() string { } // GetVertex loads a vertex given an id. It returns a nil if not found -func (mg *Graph) GetVertex(id string, load bool) *gripql.Vertex { +func (mg *Graph) GetVertex(id string, load bool) *gdbi.Vertex { opts := options.FindOne() if !load { opts.SetProjection(map[string]interface{}{"_id": 1, "label": 1}) @@ -59,7 +58,7 @@ func (mg *Graph) GetVertex(id string, load bool) *gripql.Vertex { } // GetEdge loads an edge given an id. It returns nil if not found -func (mg *Graph) GetEdge(id string, load bool) *gripql.Edge { +func (mg *Graph) GetEdge(id string, load bool) *gdbi.Edge { opts := options.FindOne() if !load { opts.SetProjection(map[string]interface{}{"_id": 1, "label": 1, "from": 1, "to": 1}) @@ -78,12 +77,12 @@ func (mg *Graph) GetEdge(id string, load bool) *gripql.Edge { // AddVertex adds an edge to the graph, if it already exists // in the graph, it is replaced -func (mg *Graph) AddVertex(vertices []*gripql.Vertex) error { +func (mg *Graph) AddVertex(vertices []*gdbi.Vertex) error { vCol := mg.ar.VertexCollection(mg.graph) var err error docBatch := make([]mongo.WriteModel, 0, len(vertices)) for _, v := range vertices { - i := mongo.NewReplaceOneModel().SetUpsert(true).SetFilter(bson.M{"_id": v.Gid}) + i := mongo.NewReplaceOneModel().SetUpsert(true).SetFilter(bson.M{"_id": v.ID}) ent := PackVertex(v) i.SetReplacement(ent) docBatch = append(docBatch, i) @@ -99,12 +98,12 @@ func (mg *Graph) AddVertex(vertices []*gripql.Vertex) error { // AddEdge adds an edge to the graph, if it already exists // in the graph, it is replaced -func (mg *Graph) AddEdge(edges []*gripql.Edge) error { +func (mg *Graph) AddEdge(edges []*gdbi.Edge) error { eCol := mg.ar.EdgeCollection(mg.graph) var err error docBatch := make([]mongo.WriteModel, 0, len(edges)) for _, edge := range edges { - i := mongo.NewReplaceOneModel().SetUpsert(true).SetFilter(bson.M{"_id": edge.Gid}) + i := mongo.NewReplaceOneModel().SetUpsert(true).SetFilter(bson.M{"_id": edge.ID}) ent := PackEdge(edge) i.SetReplacement(ent) docBatch = append(docBatch, i) @@ -115,7 +114,7 @@ func (mg *Graph) AddEdge(edges []*gripql.Edge) error { return err } -func (mg *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (mg *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error { return util.StreamBatch(stream, 50, mg.graph, mg.AddVertex, mg.AddEdge) } @@ -157,8 +156,8 @@ func (mg *Graph) DelEdge(key string) error { } // GetVertexList produces a channel of all vertices in the graph -func (mg *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex { - o := make(chan *gripql.Vertex, 100) +func (mg *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, 100) go func() { defer close(o) @@ -192,8 +191,8 @@ func (mg *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Ve } // GetEdgeList produces a channel of all edges in the graph -func (mg *Graph) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gripql.Edge { - o := make(chan *gripql.Edge, 100) +func (mg *Graph) GetEdgeList(ctx context.Context, loadProp bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) go func() { defer close(o) @@ -263,12 +262,12 @@ func (mg *Graph) GetVertexChannel(ctx context.Context, ids chan gdbi.ElementLook if err != nil { return } - chunk := map[string]*gripql.Vertex{} + chunk := map[string]*gdbi.Vertex{} result := map[string]interface{}{} for cursor.Next(context.TODO()) { if err := cursor.Decode(&result); err == nil { v := UnpackVertex(result) - chunk[v.Gid] = v + chunk[v.ID] = v } else { log.WithFields(log.Fields{"error": err}).Error("Decode error") } diff --git a/psql/graph.go b/psql/graph.go index 580aaca1..f7a7feaf 100644 --- a/psql/graph.go +++ b/psql/graph.go @@ -2,12 +2,12 @@ package psql import ( "context" + "encoding/json" "fmt" "strings" "github.com/bmeg/grip/engine/core" "github.com/bmeg/grip/gdbi" - "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" "github.com/bmeg/grip/timestamp" "github.com/bmeg/grip/util" @@ -35,7 +35,7 @@ func (g *Graph) Compiler() gdbi.Compiler { //////////////////////////////////////////////////////////////////////////////// // AddVertex adds a vertex to the database -func (g *Graph) AddVertex(vertices []*gripql.Vertex) error { +func (g *Graph) AddVertex(vertices []*gdbi.Vertex) error { txn, err := g.db.Begin() if err != nil { return fmt.Errorf("AddVertex: Begin Txn: %v", err) @@ -55,11 +55,11 @@ func (g *Graph) AddVertex(vertices []*gripql.Vertex) error { } for _, v := range vertices { - js, err := v.Data.MarshalJSON() + js, err := json.Marshal(v.Data) if err != nil { return fmt.Errorf("AddVertex: Stmt.Exec: %v", err) } - _, err = stmt.Exec(v.Gid, v.Label, js) + _, err = stmt.Exec(v.ID, v.Label, js) if err != nil { return fmt.Errorf("AddVertex: Stmt.Exec: %v", err) } @@ -79,7 +79,7 @@ func (g *Graph) AddVertex(vertices []*gripql.Vertex) error { } // AddEdge adds an edge to the database -func (g *Graph) AddEdge(edges []*gripql.Edge) error { +func (g *Graph) AddEdge(edges []*gdbi.Edge) error { txn, err := g.db.Begin() if err != nil { return fmt.Errorf("AddEdge: Begin Txn: %v", err) @@ -101,11 +101,11 @@ func (g *Graph) AddEdge(edges []*gripql.Edge) error { } for _, e := range edges { - js, err := e.Data.MarshalJSON() + js, err := json.Marshal(e.Data) if err != nil { return fmt.Errorf("AddEdge: Stmt.Exec: %v", err) } - _, err = stmt.Exec(e.Gid, e.Label, e.From, e.To, js) + _, err = stmt.Exec(e.ID, e.Label, e.From, e.To, js) if err != nil { return fmt.Errorf("AddEdge: Stmt.Exec: %v", err) } @@ -124,7 +124,7 @@ func (g *Graph) AddEdge(edges []*gripql.Edge) error { return nil } -func (g *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error { +func (g *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error { return util.StreamBatch(stream, 50, g.graph, g.AddVertex, g.AddEdge) } @@ -171,7 +171,7 @@ func (g *Graph) GetTimestamp() string { } // GetVertex loads a vertex given an id. It returns a nil if not found. -func (g *Graph) GetVertex(gid string, load bool) *gripql.Vertex { +func (g *Graph) GetVertex(gid string, load bool) *gdbi.Vertex { q := fmt.Sprintf(`SELECT gid, label FROM %s WHERE gid='%s'`, g.v, gid) if load { q = fmt.Sprintf(`SELECT * FROM %s WHERE gid='%s'`, g.v, gid) @@ -187,11 +187,11 @@ func (g *Graph) GetVertex(gid string, load bool) *gripql.Vertex { log.WithFields(log.Fields{"error": err}).Error("GetVertex: convertVertexRow") return nil } - return vertex + return gdbi.NewElementFromVertex(vertex) } // GetEdge loads an edge given an id. It returns a nil if not found. -func (g *Graph) GetEdge(gid string, load bool) *gripql.Edge { +func (g *Graph) GetEdge(gid string, load bool) *gdbi.Edge { q := fmt.Sprintf(`SELECT gid, label, "from", "to" FROM %s WHERE gid='%s'`, g.e, gid) if load { q = fmt.Sprintf(`SELECT * FROM %s WHERE gid='%s'`, g.e, gid) @@ -207,12 +207,12 @@ func (g *Graph) GetEdge(gid string, load bool) *gripql.Edge { log.WithFields(log.Fields{"error": err}).Error("GetEdge: convertEdgeRow") return nil } - return edge + return gdbi.NewElementFromEdge(edge) } // GetVertexList produces a channel of all vertices in the graph -func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex { - o := make(chan *gripql.Vertex, 100) +func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, 100) go func() { defer close(o) q := fmt.Sprintf("SELECT gid, label FROM %s", g.v) @@ -236,7 +236,7 @@ func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Ver log.WithFields(log.Fields{"error": err}).Error("GetVertexList: convertVertexRow") continue } - o <- v + o <- gdbi.NewElementFromVertex(v) } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexList: iterating") @@ -273,8 +273,8 @@ func (g *Graph) VertexLabelScan(ctx context.Context, label string) chan string { } // GetEdgeList produces a channel of all edges in the graph -func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge { - o := make(chan *gripql.Edge, 100) +func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) go func() { defer close(o) q := fmt.Sprintf(`SELECT gid, label, "from", "to" FROM %s`, g.e) @@ -298,7 +298,7 @@ func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: convertEdgeRow") continue } - o <- e + o <- gdbi.NewElementFromEdge(e) } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating") @@ -342,7 +342,7 @@ func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementL return } defer rows.Close() - chunk := map[string]*gripql.Vertex{} + chunk := map[string]*gdbi.Vertex{} for rows.Next() { vrow := &row{} if err := rows.StructScan(vrow); err != nil { @@ -354,7 +354,7 @@ func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementL log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: convertVertexRow") continue } - chunk[v.Gid] = v + chunk[v.Gid] = gdbi.NewElementFromVertex(v) } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: iterating") @@ -455,7 +455,7 @@ func (g *Graph) GetOutChannel(ctx context.Context, reqChan chan gdbi.ElementLook } r := batchMap[vrow.From] for _, ri := range r { - ri.Vertex = v + ri.Vertex = gdbi.NewElementFromVertex(v) o <- ri } } @@ -552,7 +552,7 @@ func (g *Graph) GetInChannel(ctx context.Context, reqChan chan gdbi.ElementLooku } r := batchMap[vrow.To] for _, ri := range r { - ri.Vertex = v + ri.Vertex = gdbi.NewElementFromVertex(v) o <- ri } } @@ -637,7 +637,7 @@ func (g *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.Element } r := batchMap[erow.From] for _, ri := range r { - ri.Edge = e + ri.Edge = gdbi.NewElementFromEdge(e) o <- ri } } @@ -722,7 +722,7 @@ func (g *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementL } r := batchMap[erow.To] for _, ri := range r { - ri.Edge = e + ri.Edge = gdbi.NewElementFromEdge(e) o <- ri } } diff --git a/server/api.go b/server/api.go index 70e49a56..3000db83 100644 --- a/server/api.go +++ b/server/api.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/bmeg/grip/engine/pipeline" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" "github.com/bmeg/grip/util" @@ -69,7 +70,7 @@ func (server *GripServer) GetVertex(ctx context.Context, elem *gripql.ElementID) if o == nil { return nil, grpc.Errorf(codes.NotFound, fmt.Sprintf("vertex %s not found", elem.Id)) } - return o, nil + return o.ToVertex(), nil } // GetEdge returns an edge given a gripql.Element @@ -86,7 +87,7 @@ func (server *GripServer) GetEdge(ctx context.Context, elem *gripql.ElementID) ( if o == nil { return nil, grpc.Errorf(codes.NotFound, fmt.Sprintf("edge %s not found", elem.Id)) } - return o, nil + return o.ToEdge(), nil } // GetTimestamp returns the update timestamp of a graph @@ -163,7 +164,7 @@ func (server *GripServer) addVertex(ctx context.Context, elem *gripql.GraphEleme return nil, fmt.Errorf("vertex validation failed: %v", err) } - err = graph.AddVertex([]*gripql.Vertex{vertex}) + err = graph.AddVertex([]*gdbi.Vertex{gdbi.NewElementFromVertex(vertex)}) if err != nil { return nil, err } @@ -197,7 +198,7 @@ func (server *GripServer) addEdge(ctx context.Context, elem *gripql.GraphElement return nil, fmt.Errorf("edge validation failed: %v", err) } - err = graph.AddEdge([]*gripql.Edge{edge}) + err = graph.AddEdge([]*gdbi.Edge{gdbi.NewElementFromEdge(edge)}) if err != nil { return nil, err } @@ -210,7 +211,7 @@ func (server *GripServer) BulkAdd(stream gripql.Edit_BulkAddServer) error { var insertCount int32 var errorCount int32 - elementStream := make(chan *gripql.GraphElement, 100) + elementStream := make(chan *gdbi.GraphElement, 100) wg := &sync.WaitGroup{} for { @@ -249,7 +250,7 @@ func (server *GripServer) BulkAdd(stream gripql.Edit_BulkAddServer) error { } graphName = element.Graph - elementStream = make(chan *gripql.GraphElement, 100) + elementStream = make(chan *gdbi.GraphElement, 100) wg.Add(1) go func() { @@ -271,7 +272,7 @@ func (server *GripServer) BulkAdd(stream gripql.Edit_BulkAddServer) error { log.WithFields(log.Fields{"graph": element.Graph, "error": err}).Errorf("BulkAdd: vertex validation failed") } else { insertCount++ - elementStream <- element + elementStream <- gdbi.NewGraphElement(element) } } @@ -285,7 +286,7 @@ func (server *GripServer) BulkAdd(stream gripql.Edit_BulkAddServer) error { log.WithFields(log.Fields{"graph": element.Graph, "error": err}).Errorf("BulkAdd: edge validation failed") } else { insertCount++ - elementStream <- element + elementStream <- gdbi.NewGraphElement(element) } } } diff --git a/util/insert.go b/util/insert.go index e0467991..419df0ac 100644 --- a/util/insert.go +++ b/util/insert.go @@ -4,20 +4,20 @@ import ( "fmt" "sync" - "github.com/bmeg/grip/gripql" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/log" multierror "github.com/hashicorp/go-multierror" ) // StreamBatch a stream of inputs and loads them into the graph // This function assumes incoming stream is GraphElemnts from a single graph -func StreamBatch(stream <-chan *gripql.GraphElement, batchSize int, graph string, vertexAdd func([]*gripql.Vertex) error, edgeAdd func([]*gripql.Edge) error) error { +func StreamBatch(stream <-chan *gdbi.GraphElement, batchSize int, graph string, vertexAdd func([]*gdbi.Vertex) error, edgeAdd func([]*gdbi.Edge) error) error { var bulkErr *multierror.Error vertCount := 0 edgeCount := 0 - vertexBatchChan := make(chan []*gripql.Vertex) - edgeBatchChan := make(chan []*gripql.Edge) + vertexBatchChan := make(chan []*gdbi.Vertex) + edgeBatchChan := make(chan []*gdbi.Edge) wg := &sync.WaitGroup{} wg.Add(1) @@ -46,8 +46,8 @@ func StreamBatch(stream <-chan *gripql.GraphElement, batchSize int, graph string wg.Done() }() - vertexBatch := make([]*gripql.Vertex, 0, batchSize) - edgeBatch := make([]*gripql.Edge, 0, batchSize) + vertexBatch := make([]*gdbi.Vertex, 0, batchSize) + edgeBatch := make([]*gdbi.Edge, 0, batchSize) for element := range stream { if element.Graph != graph { @@ -58,7 +58,7 @@ func StreamBatch(stream <-chan *gripql.GraphElement, batchSize int, graph string } else if element.Vertex != nil { if len(vertexBatch) >= batchSize { vertexBatchChan <- vertexBatch - vertexBatch = make([]*gripql.Vertex, 0, batchSize) + vertexBatch = make([]*gdbi.Vertex, 0, batchSize) } vertex := element.Vertex err := vertex.Validate() @@ -74,11 +74,11 @@ func StreamBatch(stream <-chan *gripql.GraphElement, batchSize int, graph string } else if element.Edge != nil { if len(edgeBatch) >= batchSize { edgeBatchChan <- edgeBatch - edgeBatch = make([]*gripql.Edge, 0, batchSize) + edgeBatch = make([]*gdbi.Edge, 0, batchSize) } edge := element.Edge - if edge.Gid == "" { - edge.Gid = UUID() + if edge.ID == "" { + edge.ID = UUID() } err := edge.Validate() if err != nil { diff --git a/util/protoutil/protobuf.go b/util/protoutil/protobuf.go index 2806e8cc..17af0cb1 100644 --- a/util/protoutil/protobuf.go +++ b/util/protoutil/protobuf.go @@ -1,6 +1,7 @@ package protoutil import ( + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" ) @@ -22,3 +23,20 @@ func AsStringList(src *structpb.ListValue) []string { } return out } + +func StructMarshal(v map[string]interface{}) ([]byte, error) { + s, err := structpb.NewStruct(v) + if err != nil { + return nil, err + } + return proto.Marshal(s) +} + +func StructUnMarshal(b []byte) (map[string]interface{}, error) { + s, _ := structpb.NewStruct(map[string]interface{}{}) + err := proto.Unmarshal(b, s) + if err != nil { + return nil, err + } + return s.AsMap(), nil +} From 589be97d38417346f65daec39a9e29e5e06544bc Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Sun, 28 Mar 2021 23:47:02 -0700 Subject: [PATCH 2/9] Fixing issue in kvgraph --- kvgraph/graph.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kvgraph/graph.go b/kvgraph/graph.go index fe49f51f..8f8be8c7 100644 --- a/kvgraph/graph.go +++ b/kvgraph/graph.go @@ -381,7 +381,7 @@ func (kgdb *KVInterfaceGDB) GetOutChannel(ctx context.Context, reqChan chan gdbi //} } req.req.Vertex = &gdbi.Vertex{ - ID: req.req.ID, + ID: gid, Label: v.Label, Data: v.Data.AsMap(), Loaded: true, @@ -419,7 +419,7 @@ func (kgdb *KVInterfaceGDB) GetInChannel(ctx context.Context, reqChan chan gdbi. } //} req.Vertex = &gdbi.Vertex{ - ID: req.ID, + ID: src, Label: v.Label, Data: v.Data.AsMap(), Loaded: true, From 964c86c0298e0a5b92858d284bee49b286ae44fe Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Mar 2021 09:55:22 -0700 Subject: [PATCH 3/9] Fixing unit tests --- test/main_test.go | 4 ++-- test/pathcompile_test.go | 9 +++++---- test/schema_test.go | 13 +++++++++++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/test/main_test.go b/test/main_test.go index 785d180e..b955d49d 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -37,7 +37,7 @@ func setupGraph() error { return vertices[i].Gid < vertices[j].Gid }) for _, v := range vertices { - err := db.AddVertex([]*gripql.Vertex{v}) + err := db.AddVertex([]*gdbi.Vertex{gdbi.NewElementFromVertex(v)}) if err != nil { return err } @@ -47,7 +47,7 @@ func setupGraph() error { return edges[i].Gid < edges[j].Gid }) for _, e := range edges { - err := db.AddEdge([]*gripql.Edge{e}) + err := db.AddEdge([]*gdbi.Edge{gdbi.NewElementFromEdge(e)}) if err != nil { return err } diff --git a/test/pathcompile_test.go b/test/pathcompile_test.go index 0604207b..40ec1dfa 100644 --- a/test/pathcompile_test.go +++ b/test/pathcompile_test.go @@ -9,6 +9,7 @@ import ( "github.com/bmeg/grip/engine/inspect" "github.com/bmeg/grip/engine/pipeline" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/grids" "github.com/bmeg/grip/gripql" "github.com/golang/protobuf/jsonpb" @@ -60,25 +61,25 @@ func TestEngineQuery(t *testing.T) { m := jsonpb.Unmarshaler{} - vset := []*gripql.Vertex{} + vset := []*gdbi.Vertex{} for _, r := range pathVertices { v := &gripql.Vertex{} err := m.Unmarshal(strings.NewReader(r), v) if err != nil { t.Error(err) } - vset = append(vset, v) + vset = append(vset, gdbi.NewElementFromVertex(v)) } graph.AddVertex(vset) - eset := []*gripql.Edge{} + eset := []*gdbi.Edge{} for _, r := range pathEdges { e := &gripql.Edge{} err := m.Unmarshal(strings.NewReader(r), e) if err != nil { t.Error(err) } - eset = append(eset, e) + eset = append(eset, gdbi.NewElementFromEdge(e)) } graph.AddEdge(eset) diff --git a/test/schema_test.go b/test/schema_test.go index 511bb235..ded42105 100644 --- a/test/schema_test.go +++ b/test/schema_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/bmeg/grip/engine/pipeline" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/util" "google.golang.org/protobuf/types/known/structpb" @@ -29,11 +30,19 @@ func TestSchema(t *testing.T) { if err != nil { t.Fatal(err) } - err = gi.AddVertex(schema.Vertices) + ve := []*gdbi.Vertex{} + for i := range schema.Vertices { + ve = append(ve, gdbi.NewElementFromVertex(schema.Vertices[i])) + } + err = gi.AddVertex(ve) if err != nil { t.Fatal(err) } - err = gi.AddEdge(schema.Edges) + ee := []*gdbi.Edge{} + for i := range schema.Edges { + ee = append(ee, gdbi.NewElementFromEdge(schema.Edges[i])) + } + err = gi.AddEdge(ee) if err != nil { t.Fatal(err) } From 282df06b7c84e58d496853714e6c29686e42efda Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Mar 2021 15:00:00 -0700 Subject: [PATCH 4/9] Adding step to load outputed elements that haven't been loaded --- engine/core/compile.go | 12 +++++++++--- engine/pipeline/pipes.go | 22 +++++++++++++++++----- gdbi/pipeline.go | 1 + grids/compiler.go | 2 +- mongo/compile.go | 8 +++++++- server/job_manager.go | 14 ++++++++++++-- 6 files changed, 47 insertions(+), 12 deletions(-) diff --git a/engine/core/compile.go b/engine/core/compile.go index a7b8493d..b62b5728 100644 --- a/engine/core/compile.go +++ b/engine/core/compile.go @@ -12,13 +12,14 @@ import ( // DefaultPipeline a set of runnable query operations type DefaultPipeline struct { + graph gdbi.GraphInterface procs []gdbi.Processor dataType gdbi.DataType markTypes map[string]gdbi.DataType } -func NewPipeline(procs []gdbi.Processor, ps *pipeline.State) *DefaultPipeline { - return &DefaultPipeline{procs, ps.LastType, ps.MarkTypes} +func NewPipeline(graph gdbi.GraphInterface, procs []gdbi.Processor, ps *pipeline.State) *DefaultPipeline { + return &DefaultPipeline{graph, procs, ps.LastType, ps.MarkTypes} } // DataType return the datatype @@ -36,6 +37,11 @@ func (pipe *DefaultPipeline) Processors() []gdbi.Processor { return pipe.procs } +// Graph gets the processor graph interface +func (pipe *DefaultPipeline) Graph() gdbi.GraphInterface { + return pipe.graph +} + // DefaultCompiler is the core compiler that works with default graph interface type DefaultCompiler struct { db gdbi.GraphInterface @@ -80,7 +86,7 @@ func (comp DefaultCompiler) Compile(stmts []*gripql.GraphStatement, opts *gdbi.C procs = append(procs, p) } - return &DefaultPipeline{procs, ps.LastType, ps.MarkTypes}, nil + return &DefaultPipeline{comp.db, procs, ps.LastType, ps.MarkTypes}, nil } func StatementProcessor(gs *gripql.GraphStatement, db gdbi.GraphInterface, ps *pipeline.State) (gdbi.Processor, error) { diff --git a/engine/pipeline/pipes.go b/engine/pipeline/pipes.go index 77819b4a..6972e6c0 100644 --- a/engine/pipeline/pipes.go +++ b/engine/pipeline/pipes.go @@ -55,11 +55,12 @@ func Run(ctx context.Context, pipe gdbi.Pipeline, workdir string) <-chan *gripql resch := make(chan *gripql.QueryResult, bufsize) go func() { defer close(resch) + graph := pipe.Graph() dataType := pipe.DataType() markTypes := pipe.MarkTypes() man := engine.NewManager(workdir) for t := range Start(ctx, pipe, man, bufsize, nil) { - resch <- Convert(dataType, markTypes, t) + resch <- Convert(graph, dataType, markTypes, t) } man.Cleanup() }() @@ -72,11 +73,12 @@ func Resume(ctx context.Context, pipe gdbi.Pipeline, workdir string, input gdbi. resch := make(chan *gripql.QueryResult, bufsize) go func() { defer close(resch) + graph := pipe.Graph() dataType := pipe.DataType() markTypes := pipe.MarkTypes() man := engine.NewManager(workdir) for t := range Start(ctx, pipe, man, bufsize, input) { - resch <- Convert(dataType, markTypes, t) + resch <- Convert(graph, dataType, markTypes, t) } man.Cleanup() }() @@ -84,19 +86,29 @@ func Resume(ctx context.Context, pipe gdbi.Pipeline, workdir string, input gdbi. } // Convert takes a traveler and converts it to query output -func Convert(dataType gdbi.DataType, markTypes map[string]gdbi.DataType, t *gdbi.Traveler) *gripql.QueryResult { +func Convert(graph gdbi.GraphInterface, dataType gdbi.DataType, markTypes map[string]gdbi.DataType, t *gdbi.Traveler) *gripql.QueryResult { switch dataType { case gdbi.VertexData: + ve := t.GetCurrent() + if !ve.Loaded { + //TODO: doing single vertex queries is slow. + // Need to rework this to do batched queries + ve = graph.GetVertex(ve.ID, true) + } return &gripql.QueryResult{ Result: &gripql.QueryResult_Vertex{ - Vertex: t.GetCurrent().ToVertex(), + Vertex: ve.ToVertex(), }, } case gdbi.EdgeData: + ee := t.GetCurrent() + if !ee.Loaded { + ee = graph.GetEdge(ee.ID, true) + } return &gripql.QueryResult{ Result: &gripql.QueryResult_Edge{ - Edge: t.GetCurrent().ToEdge(), + Edge: ee.ToEdge(), }, } diff --git a/gdbi/pipeline.go b/gdbi/pipeline.go index 1971c4b2..c5d2569e 100644 --- a/gdbi/pipeline.go +++ b/gdbi/pipeline.go @@ -34,6 +34,7 @@ type Processor interface { // Pipeline represents a set of processors type Pipeline interface { + Graph() GraphInterface Processors() []Processor DataType() DataType MarkTypes() map[string]DataType diff --git a/grids/compiler.go b/grids/compiler.go index c8d08ded..728ce417 100644 --- a/grids/compiler.go +++ b/grids/compiler.go @@ -88,5 +88,5 @@ func (comp Compiler) Compile(stmts []*gripql.GraphStatement, opts *gdbi.CompileO procs = append(procs, p) } } - return core.NewPipeline(procs, ps), nil + return core.NewPipeline(comp.graph, procs, ps), nil } diff --git a/mongo/compile.go b/mongo/compile.go index 5e7437ea..b00a23c2 100644 --- a/mongo/compile.go +++ b/mongo/compile.go @@ -17,6 +17,7 @@ import ( // Pipeline a set of runnable query operations type Pipeline struct { + graph gdbi.GraphInterface procs []gdbi.Processor dataType gdbi.DataType markTypes map[string]gdbi.DataType @@ -37,6 +38,11 @@ func (pipe *Pipeline) Processors() []gdbi.Processor { return pipe.procs } +// Graph gets the graph interface +func (pipe *Pipeline) Graph() gdbi.GraphInterface { + return pipe.graph +} + // Compiler is a mongo specific compiler that works with default graph interface type Compiler struct { db *Graph @@ -705,5 +711,5 @@ func (comp *Compiler) Compile(stmts []*gripql.GraphStatement, opts *gdbi.Compile } procs = append([]gdbi.Processor{&Processor{comp.db, startCollection, query, lastType, markTypes, aggTypes}}, procs...) - return &Pipeline{procs, lastType, markTypes}, nil + return &Pipeline{comp.db, procs, lastType, markTypes}, nil } diff --git a/server/job_manager.go b/server/job_manager.go index 0b268a3d..10c0f957 100644 --- a/server/job_manager.go +++ b/server/job_manager.go @@ -55,8 +55,13 @@ func (server *GripServer) GetResults(job *gripql.QueryJob, srv gripql.Query_GetR if err != nil { return err } + gdb, err := server.getGraphDB(job.Graph) + if err != nil { + return err + } + graph, err := gdb.Graph(job.Graph) for o := range out.Pipe { - res := pipeline.Convert(out.DataType, out.MarkTypes, o) + res := pipeline.Convert(graph, out.DataType, out.MarkTypes, o) srv.Send(res) } return nil @@ -104,8 +109,13 @@ func (server *GripServer) ViewJob(job *gripql.QueryJob, srv gripql.Job_ViewJobSe if err != nil { return nil } + gdb, err := server.getGraphDB(job.Graph) + if err != nil { + return err + } + graph, err := gdb.Graph(job.Graph) for o := range stream.Pipe { - res := pipeline.Convert(stream.DataType, stream.MarkTypes, o) + res := pipeline.Convert(graph, stream.DataType, stream.MarkTypes, o) srv.Send(res) } return nil From e92f83e224b5abbf131cf650dfa12f35b4c17ca6 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Mar 2021 15:08:43 -0700 Subject: [PATCH 5/9] Fixing field output data load status --- engine/pipeline/pipes.go | 1 + jsonpath/jsonpath.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/pipeline/pipes.go b/engine/pipeline/pipes.go index 6972e6c0..40950cd7 100644 --- a/engine/pipeline/pipes.go +++ b/engine/pipeline/pipes.go @@ -91,6 +91,7 @@ func Convert(graph gdbi.GraphInterface, dataType gdbi.DataType, markTypes map[st case gdbi.VertexData: ve := t.GetCurrent() if !ve.Loaded { + //log.Infof("Loading output vertex: %s", ve.ID) //TODO: doing single vertex queries is slow. // Need to rework this to do batched queries ve = graph.GetVertex(ve.ID, true) diff --git a/jsonpath/jsonpath.go b/jsonpath/jsonpath.go index 8118191b..0a33d059 100644 --- a/jsonpath/jsonpath.go +++ b/jsonpath/jsonpath.go @@ -243,7 +243,7 @@ KeyLoop: if len(includePaths) > 0 { ode = includeFields(ode, cde, includePaths) } - + ode.Loaded = true return out } From bce6abca3cc7e19993f8391565d7d74dd716fef8 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Mar 2021 15:36:48 -0700 Subject: [PATCH 6/9] Fixing issue in psql driver --- psql/graph.go | 18 +++++++++--------- psql/util.go | 31 +++++++++++++++---------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/psql/graph.go b/psql/graph.go index f7a7feaf..c123c726 100644 --- a/psql/graph.go +++ b/psql/graph.go @@ -187,7 +187,7 @@ func (g *Graph) GetVertex(gid string, load bool) *gdbi.Vertex { log.WithFields(log.Fields{"error": err}).Error("GetVertex: convertVertexRow") return nil } - return gdbi.NewElementFromVertex(vertex) + return vertex } // GetEdge loads an edge given an id. It returns a nil if not found. @@ -207,7 +207,7 @@ func (g *Graph) GetEdge(gid string, load bool) *gdbi.Edge { log.WithFields(log.Fields{"error": err}).Error("GetEdge: convertEdgeRow") return nil } - return gdbi.NewElementFromEdge(edge) + return edge } // GetVertexList produces a channel of all vertices in the graph @@ -236,7 +236,7 @@ func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Verte log.WithFields(log.Fields{"error": err}).Error("GetVertexList: convertVertexRow") continue } - o <- gdbi.NewElementFromVertex(v) + o <- v } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexList: iterating") @@ -298,7 +298,7 @@ func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: convertEdgeRow") continue } - o <- gdbi.NewElementFromEdge(e) + o <- e } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating") @@ -354,7 +354,7 @@ func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementL log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: convertVertexRow") continue } - chunk[v.Gid] = gdbi.NewElementFromVertex(v) + chunk[v.ID] = v } if err := rows.Err(); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: iterating") @@ -455,7 +455,7 @@ func (g *Graph) GetOutChannel(ctx context.Context, reqChan chan gdbi.ElementLook } r := batchMap[vrow.From] for _, ri := range r { - ri.Vertex = gdbi.NewElementFromVertex(v) + ri.Vertex = v o <- ri } } @@ -552,7 +552,7 @@ func (g *Graph) GetInChannel(ctx context.Context, reqChan chan gdbi.ElementLooku } r := batchMap[vrow.To] for _, ri := range r { - ri.Vertex = gdbi.NewElementFromVertex(v) + ri.Vertex = v o <- ri } } @@ -637,7 +637,7 @@ func (g *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.Element } r := batchMap[erow.From] for _, ri := range r { - ri.Edge = gdbi.NewElementFromEdge(e) + ri.Edge = e o <- ri } } @@ -722,7 +722,7 @@ func (g *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementL } r := batchMap[erow.To] for _, ri := range r { - ri.Edge = gdbi.NewElementFromEdge(e) + ri.Edge = e o <- ri } } diff --git a/psql/util.go b/psql/util.go index 0c9e0e4f..38c3d2c0 100644 --- a/psql/util.go +++ b/psql/util.go @@ -5,8 +5,7 @@ import ( "fmt" "regexp" - "github.com/bmeg/grip/gripql" - "google.golang.org/protobuf/types/known/structpb" + "github.com/bmeg/grip/gdbi" ) type row struct { @@ -17,7 +16,7 @@ type row struct { Data []byte } -func convertVertexRow(row *row, load bool) (*gripql.Vertex, error) { +func convertVertexRow(row *row, load bool) (*gdbi.Vertex, error) { props := make(map[string]interface{}) if load { err := json.Unmarshal(row.Data, &props) @@ -25,16 +24,16 @@ func convertVertexRow(row *row, load bool) (*gripql.Vertex, error) { return nil, fmt.Errorf("unmarshal error: %v", err) } } - sProps, _ := structpb.NewStruct(props) - v := &gripql.Vertex{ - Gid: row.Gid, - Label: row.Label, - Data: sProps, + v := &gdbi.Vertex{ + ID: row.Gid, + Label: row.Label, + Data: props, + Loaded: load, } return v, nil } -func convertEdgeRow(row *row, load bool) (*gripql.Edge, error) { +func convertEdgeRow(row *row, load bool) (*gdbi.Edge, error) { props := make(map[string]interface{}) if load { err := json.Unmarshal(row.Data, &props) @@ -42,13 +41,13 @@ func convertEdgeRow(row *row, load bool) (*gripql.Edge, error) { return nil, fmt.Errorf("unmarshal error: %v", err) } } - sProps, _ := structpb.NewStruct(props) - e := &gripql.Edge{ - Gid: row.Gid, - Label: row.Label, - From: row.From, - To: row.To, - Data: sProps, + e := &gdbi.Edge{ + ID: row.Gid, + Label: row.Label, + From: row.From, + To: row.To, + Data: props, + Loaded: load, } return e, nil } From 8fe98ae213ff8046bd64d6cc9bef9f7c58c0e173 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Mar 2021 16:49:10 -0700 Subject: [PATCH 7/9] Fixing issue in elastic driver --- elastic/graph.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/elastic/graph.go b/elastic/graph.go index 06c95495..1a05ee5f 100644 --- a/elastic/graph.go +++ b/elastic/graph.go @@ -191,8 +191,9 @@ func (es *Graph) GetEdge(id string, load bool) *gdbi.Edge { log.WithFields(log.Fields{"error": err}).Error("GetEdge: unmarshal") return nil } - - return gdbi.NewElementFromEdge(edge) + o := gdbi.NewElementFromEdge(edge) + o.Loaded = load + return o } // GetVertex gets vertex `id` @@ -217,7 +218,9 @@ func (es *Graph) GetVertex(id string, load bool) *gdbi.Vertex { return nil } - return gdbi.NewElementFromVertex(vertex) + o := gdbi.NewElementFromVertex(vertex) + o.Loaded = load + return o } // GetEdgeList produces a channel of all edges in the graph @@ -257,7 +260,9 @@ func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { if err != nil { return err } - o <- gdbi.NewElementFromEdge(edge) + i := gdbi.NewElementFromEdge(edge) + i.Loaded = load + o <- i } return nil }) @@ -310,7 +315,9 @@ func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vert if err != nil { return fmt.Errorf("Failed to unmarshal vertex: %v", err) } - o <- gdbi.NewElementFromVertex(vertex) + i := gdbi.NewElementFromVertex(vertex) + i.Loaded = load + o <- i } return nil }) @@ -374,6 +381,7 @@ func (es *Graph) GetVertexChannel(ctx context.Context, req chan gdbi.ElementLook r := batchMap[vertex.Gid] for _, ri := range r { ri.Vertex = gdbi.NewElementFromVertex(vertex) + ri.Vertex.Loaded = load o <- ri } } @@ -486,6 +494,7 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup, r := batchMap[vertex.Gid] for _, ri := range r { ri.Vertex = gdbi.NewElementFromVertex(vertex) + ri.Vertex.Loaded = load o <- ri } } @@ -597,6 +606,7 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup, r := batchMap[vertex.Gid] for _, ri := range r { ri.Vertex = gdbi.NewElementFromVertex(vertex) + ri.Vertex.Loaded = load o <- ri } } @@ -672,6 +682,7 @@ func (es *Graph) GetOutEdgeChannel(ctx context.Context, req chan gdbi.ElementLoo r := batchMap[edge.From] for _, ri := range r { ri.Edge = gdbi.NewElementFromEdge(edge) + ri.Edge.Loaded = load o <- ri } } @@ -747,6 +758,7 @@ func (es *Graph) GetInEdgeChannel(ctx context.Context, req chan gdbi.ElementLook r := batchMap[edge.To] for _, ri := range r { ri.Edge = gdbi.NewElementFromEdge(edge) + ri.Edge.Loaded = load o <- ri } } From 977edab92dcff16269e58233de156c40847ee2c4 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Mar 2021 17:01:39 -0700 Subject: [PATCH 8/9] Fixing issue in mongo driver --- mongo/processor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mongo/processor.go b/mongo/processor.go index ee26c5b1..56f3e4b4 100644 --- a/mongo/processor.go +++ b/mongo/processor.go @@ -35,6 +35,7 @@ func getDataElement(result map[string]interface{}) *gdbi.DataElement { } if x, ok := result["data"]; ok { de.Data = removePrimatives(x).(map[string]interface{}) + de.Loaded = true } if x, ok := result["to"]; ok { de.To = x.(string) @@ -180,6 +181,7 @@ func (proc *Processor) Process(ctx context.Context, man gdbi.Manager, in gdbi.In } if x, ok := result["data"]; ok { de.Data = removePrimatives(x).(map[string]interface{}) + de.Loaded = true } if x, ok := result["to"]; ok { de.To = x.(string) From e0ba98a17ada699815470f3512e7333ce074b002 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 30 Mar 2021 16:32:09 -0700 Subject: [PATCH 9/9] Allowing job resume results to appear in new order. --- Makefile | 2 +- conformance/tests/ot_job.py | 7 +++++-- test/mongo-core-processor.yml | 2 +- test/mongo.yml | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index ddb90851..1f4133b2 100644 --- a/Makefile +++ b/Makefile @@ -124,7 +124,7 @@ test-conformance: # --------------------- start-mongo: @docker rm -f grip-mongodb-test > /dev/null 2>&1 || echo - docker run -d --name grip-mongodb-test -p 27000:27017 docker.io/mongo:3.6.4 > /dev/null + docker run -d --name grip-mongodb-test -p 27017:27017 docker.io/mongo:3.6.4 > /dev/null start-elastic: @docker rm -f grip-es-test > /dev/null 2>&1 || echo diff --git a/conformance/tests/ot_job.py b/conformance/tests/ot_job.py index 27500e1e..0c7a0bf6 100644 --- a/conformance/tests/ot_job.py +++ b/conformance/tests/ot_job.py @@ -53,10 +53,13 @@ def test_job(man): fullResults = [] for res in G.query().V().hasLabel("Planet").as_("a").out().out().select("a"): fullResults.append(res) - + #TODO: in the future, this 'fix' may need to be removed. + #Always producing elements in the same order may become a requirement. + fullResults.sort(key=lambda x:x["gid"]) resumedResults = [] for res in G.resume(job["id"]).out().select("a").execute(): resumedResults.append(res) + resumedResults.sort(key=lambda x:x["gid"]) if len(fullResults) != len(resumedResults): errors.append( "Missmatch on resumed result" ) @@ -64,7 +67,7 @@ def test_job(man): for a, b in zip(fullResults, resumedResults): if a != b: errors.append("%s != %s" % (a, b)) - return errors + G.deleteJob(job["id"]) count = 0 for j in G.listJobs(): diff --git a/test/mongo-core-processor.yml b/test/mongo-core-processor.yml index 2094fa00..c10e3973 100644 --- a/test/mongo-core-processor.yml +++ b/test/mongo-core-processor.yml @@ -3,5 +3,5 @@ Default: mongo Drivers: mongo: MongoDB: - URL: mongodb://localhost:27000 + URL: mongodb://localhost:27017 UseCorePipeline: true diff --git a/test/mongo.yml b/test/mongo.yml index 10e6f810..07c74f48 100644 --- a/test/mongo.yml +++ b/test/mongo.yml @@ -3,4 +3,4 @@ Default: mongo Drivers: mongo: MongoDB: - URL: mongodb://localhost:27000 + URL: mongodb://localhost:27017