Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring engine to use internal Vertex/Edge structures #250

Merged
merged 9 commits into from
Mar 30, 2021
Merged
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ test-conformance:
# ---------------------
start-mongo:
@docker rm -f grip-mongodb-test > /dev/null 2>&1 || echo
docker run -d --name grip-mongodb-test -p 27000:27017 docker.io/mongo:3.6.4 > /dev/null
docker run -d --name grip-mongodb-test -p 27017:27017 docker.io/mongo:3.6.4 > /dev/null

start-elastic:
@docker rm -f grip-es-test > /dev/null 2>&1 || echo
Expand Down
8 changes: 4 additions & 4 deletions cmd/kvload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"
"time"

"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/log"
Expand Down Expand Up @@ -95,7 +95,7 @@ var Cmd = &cobra.Command{
edgeFileArray = append(edgeFileArray, edgeFile)
}

graphChan := make(chan *gripql.GraphElement, 10)
graphChan := make(chan *gdbi.GraphElement, 10)
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
Expand All @@ -115,7 +115,7 @@ var Cmd = &cobra.Command{
continue
}
for v := range vertChan {
graphChan <- &gripql.GraphElement{Graph: graph, Vertex: v}
graphChan <- &gdbi.GraphElement{Graph: graph, Vertex: gdbi.NewElementFromVertex(v)}
count++
vertexCounter.Incr(1)
if count%10000 == 0 {
Expand All @@ -135,7 +135,7 @@ var Cmd = &cobra.Command{
continue
}
for e := range edgeChan {
graphChan <- &gripql.GraphElement{Graph: graph, Edge: e}
graphChan <- &gdbi.GraphElement{Graph: graph, Edge: gdbi.NewElementFromEdge(e)}
count++
edgeCounter.Incr(1)
if count%10000 == 0 {
Expand Down
5 changes: 3 additions & 2 deletions cmd/mongoload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
//"io"
//"strings"

"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/mongo"
Expand Down Expand Up @@ -40,7 +41,7 @@ func vertexSerialize(vertChan chan *gripql.Vertex, workers int) chan []byte {
wg.Add(1)
go func() {
for v := range vertChan {
doc := mongo.PackVertex(v)
doc := mongo.PackVertex(gdbi.NewElementFromVertex(v))
rawBytes, err := bson.Marshal(doc)
if err == nil {
dataChan <- rawBytes
Expand All @@ -63,7 +64,7 @@ func edgeSerialize(edgeChan chan *gripql.Edge, workers int) chan []byte {
wg.Add(1)
go func() {
for e := range edgeChan {
doc := mongo.PackEdge(e)
doc := mongo.PackEdge(gdbi.NewElementFromEdge(e))
rawBytes, err := bson.Marshal(doc)
if err == nil {
dataChan <- rawBytes
Expand Down
5 changes: 4 additions & 1 deletion conformance/tests/ot_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ def test_job(man):
fullResults = []
for res in G.query().V().hasLabel("Planet").as_("a").out().out().select("a"):
fullResults.append(res)

#TODO: in the future, this 'fix' may need to be removed.
#Always producing elements in the same order may become a requirement.
fullResults.sort(key=lambda x:x["gid"])
resumedResults = []
for res in G.resume(job["id"]).out().select("a").execute():
resumedResults.append(res)
resumedResults.sort(key=lambda x:x["gid"])

if len(fullResults) != len(resumedResults):
errors.append( "Missmatch on resumed result" )
Expand Down
24 changes: 8 additions & 16 deletions elastic/convert.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,28 @@
package elastic

import (
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"google.golang.org/protobuf/types/known/structpb"
)

// PackVertex take a AQL vertex and convert it to a mongo doc
func PackVertex(v *gripql.Vertex) map[string]interface{} {
p := map[string]interface{}{}
if v.Data != nil {
p = v.Data.AsMap()
}
//fmt.Printf("proto:%s\nmap:%s\n", v.Data, p)
// PackVertex take a gdbi vertex and convert it to a mongo doc
func PackVertex(v *gdbi.Vertex) map[string]interface{} {
return map[string]interface{}{
"gid": v.Gid,
"gid": v.ID,
"label": v.Label,
"data": p,
"data": v.Data,
}
}

// PackEdge takes a AQL edge and converts it to a mongo doc
func PackEdge(e *gripql.Edge) map[string]interface{} {
p := map[string]interface{}{}
if e.Data != nil {
p = e.Data.AsMap()
}
func PackEdge(e *gdbi.Edge) map[string]interface{} {
return map[string]interface{}{
"gid": e.Gid,
"gid": e.ID,
"from": e.From,
"to": e.To,
"label": e.Label,
"data": p,
"data": e.Data,
}
}

Expand Down
70 changes: 41 additions & 29 deletions elastic/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func (es *Graph) GetTimestamp() string {

// AddEdge adds an edge to the graph, if the id is not "" and in already exists
// in the graph, it is replaced
func (es *Graph) AddEdge(edges []*gripql.Edge) error {
func (es *Graph) AddEdge(edges []*gdbi.Edge) error {
ctx := context.Background()

bulkRequest := es.client.Bulk()
if es.synchronous {
bulkRequest = bulkRequest.Refresh("true")
}
for _, e := range edges {
if e.Gid == "" {
if e.ID == "" {
return fmt.Errorf("Edge Gid cannot be an empty string")
}
pe := PackEdge(e)
Expand All @@ -67,7 +67,7 @@ func (es *Graph) AddEdge(edges []*gripql.Edge) error {
req := elastic.NewBulkUpdateRequest().
Index(es.edgeIndex).
Type("edge").
Id(e.Gid).
Id(e.ID).
Script(script).
Upsert(pe)
bulkRequest = bulkRequest.Add(req)
Expand All @@ -82,15 +82,15 @@ func (es *Graph) AddEdge(edges []*gripql.Edge) error {

// AddVertex adds an edge to the graph, if the id is not "" and in already exists
// in the graph, it is replaced
func (es *Graph) AddVertex(vertices []*gripql.Vertex) error {
func (es *Graph) AddVertex(vertices []*gdbi.Vertex) error {
ctx := context.Background()

bulkRequest := es.client.Bulk()
if es.synchronous {
bulkRequest = bulkRequest.Refresh("true")
}
for _, v := range vertices {
if v.Gid == "" {
if v.ID == "" {
return fmt.Errorf("Vertex Gid cannot be an empty string")
}
pv := PackVertex(v)
Expand All @@ -100,7 +100,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error {
req := elastic.NewBulkUpdateRequest().
Index(es.vertexIndex).
Type("vertex").
Id(v.Gid).
Id(v.ID).
Script(script).
Upsert(pv)
bulkRequest = bulkRequest.Add(req)
Expand All @@ -113,7 +113,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error {
return nil
}

func (es *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error {
func (es *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error {
return util.StreamBatch(stream, 50, es.graph, es.AddVertex, es.AddEdge)
}

Expand Down Expand Up @@ -171,7 +171,7 @@ func (es *Graph) DelVertex(vid string) error {
}

// GetEdge gets a specific edge
func (es *Graph) GetEdge(id string, load bool) *gripql.Edge {
func (es *Graph) GetEdge(id string, load bool) *gdbi.Edge {
ctx := context.Background()

g := es.client.Get().Index(es.edgeIndex).Id(id)
Expand All @@ -191,12 +191,13 @@ func (es *Graph) GetEdge(id string, load bool) *gripql.Edge {
log.WithFields(log.Fields{"error": err}).Error("GetEdge: unmarshal")
return nil
}

return edge
o := gdbi.NewElementFromEdge(edge)
o.Loaded = load
return o
}

// GetVertex gets vertex `id`
func (es *Graph) GetVertex(id string, load bool) *gripql.Vertex {
func (es *Graph) GetVertex(id string, load bool) *gdbi.Vertex {
ctx := context.Background()

g := es.client.Get().Index(es.vertexIndex).Id(id)
Expand All @@ -217,12 +218,14 @@ func (es *Graph) GetVertex(id string, load bool) *gripql.Vertex {
return nil
}

return vertex
o := gdbi.NewElementFromVertex(vertex)
o.Loaded = load
return o
}

// GetEdgeList produces a channel of all edges in the graph
func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge {
o := make(chan *gripql.Edge, 100)
func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge {
o := make(chan *gdbi.Edge, 100)

// 1st goroutine sends individual hits to channel.
hits := make(chan json.RawMessage, es.pageSize)
Expand Down Expand Up @@ -257,7 +260,9 @@ func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge
if err != nil {
return err
}
o <- edge
i := gdbi.NewElementFromEdge(edge)
i.Loaded = load
o <- i
}
return nil
})
Expand All @@ -274,8 +279,8 @@ func (es *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gripql.Edge
}

// GetVertexList produces a channel of all vertices in the graph
func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Vertex {
o := make(chan *gripql.Vertex, es.pageSize)
func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex {
o := make(chan *gdbi.Vertex, es.pageSize)

// 1st goroutine sends individual hits to channel.
hits := make(chan json.RawMessage, es.pageSize)
Expand Down Expand Up @@ -310,7 +315,9 @@ func (es *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gripql.Ve
if err != nil {
return fmt.Errorf("Failed to unmarshal vertex: %v", err)
}
o <- vertex
i := gdbi.NewElementFromVertex(vertex)
i.Loaded = load
o <- i
}
return nil
})
Expand Down Expand Up @@ -373,7 +380,8 @@ func (es *Graph) GetVertexChannel(ctx context.Context, req chan gdbi.ElementLook
}
r := batchMap[vertex.Gid]
for _, ri := range r {
ri.Vertex = vertex
ri.Vertex = gdbi.NewElementFromVertex(vertex)
ri.Vertex.Loaded = load
o <- ri
}
}
Expand Down Expand Up @@ -448,7 +456,7 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup,
}
r := batchMap[edge.From]
for _, ri := range r {
ri.Vertex = &gripql.Vertex{Gid: edge.To}
ri.Vertex = &gdbi.Vertex{ID: edge.To}
b = append(b, ri)
}
}
Expand All @@ -465,8 +473,8 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup,
idBatch := make([]string, len(batch))
batchMap := make(map[string][]gdbi.ElementLookup, len(batch))
for i := range batch {
idBatch[i] = batch[i].Vertex.Gid
batchMap[batch[i].Vertex.Gid] = append(batchMap[batch[i].Vertex.Gid], batch[i])
idBatch[i] = batch[i].Vertex.ID
batchMap[batch[i].Vertex.ID] = append(batchMap[batch[i].Vertex.ID], batch[i])
}

q := es.client.Search().Index(es.vertexIndex)
Expand All @@ -485,7 +493,8 @@ func (es *Graph) GetOutChannel(ctx context.Context, req chan gdbi.ElementLookup,
}
r := batchMap[vertex.Gid]
for _, ri := range r {
ri.Vertex = vertex
ri.Vertex = gdbi.NewElementFromVertex(vertex)
ri.Vertex.Loaded = load
o <- ri
}
}
Expand Down Expand Up @@ -560,7 +569,7 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup,
}
r := batchMap[edge.To]
for _, ri := range r {
ri.Vertex = &gripql.Vertex{Gid: edge.From}
ri.Vertex = &gdbi.Vertex{ID: edge.From}
b = append(b, ri)
}
}
Expand All @@ -577,8 +586,8 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup,
idBatch := make([]string, len(batch))
batchMap := make(map[string][]gdbi.ElementLookup, len(batch))
for i := range batch {
idBatch[i] = batch[i].Vertex.Gid
batchMap[batch[i].Vertex.Gid] = append(batchMap[batch[i].Vertex.Gid], batch[i])
idBatch[i] = batch[i].Vertex.ID
batchMap[batch[i].Vertex.ID] = append(batchMap[batch[i].Vertex.ID], batch[i])
}
q := es.client.Search().Index(es.vertexIndex)
q = q.Query(elastic.NewBoolQuery().Must(elastic.NewIdsQuery().Ids(idBatch...)))
Expand All @@ -596,7 +605,8 @@ func (es *Graph) GetInChannel(ctx context.Context, req chan gdbi.ElementLookup,
}
r := batchMap[vertex.Gid]
for _, ri := range r {
ri.Vertex = vertex
ri.Vertex = gdbi.NewElementFromVertex(vertex)
ri.Vertex.Loaded = load
o <- ri
}
}
Expand Down Expand Up @@ -671,7 +681,8 @@ func (es *Graph) GetOutEdgeChannel(ctx context.Context, req chan gdbi.ElementLoo
}
r := batchMap[edge.From]
for _, ri := range r {
ri.Edge = edge
ri.Edge = gdbi.NewElementFromEdge(edge)
ri.Edge.Loaded = load
o <- ri
}
}
Expand Down Expand Up @@ -746,7 +757,8 @@ func (es *Graph) GetInEdgeChannel(ctx context.Context, req chan gdbi.ElementLook
}
r := batchMap[edge.To]
for _, ri := range r {
ri.Edge = edge
ri.Edge = gdbi.NewElementFromEdge(edge)
ri.Edge.Loaded = load
o <- ri
}
}
Expand Down
12 changes: 9 additions & 3 deletions engine/core/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (

// DefaultPipeline a set of runnable query operations
type DefaultPipeline struct {
graph gdbi.GraphInterface
procs []gdbi.Processor
dataType gdbi.DataType
markTypes map[string]gdbi.DataType
}

func NewPipeline(procs []gdbi.Processor, ps *pipeline.State) *DefaultPipeline {
return &DefaultPipeline{procs, ps.LastType, ps.MarkTypes}
func NewPipeline(graph gdbi.GraphInterface, procs []gdbi.Processor, ps *pipeline.State) *DefaultPipeline {
return &DefaultPipeline{graph, procs, ps.LastType, ps.MarkTypes}
}

// DataType return the datatype
Expand All @@ -36,6 +37,11 @@ func (pipe *DefaultPipeline) Processors() []gdbi.Processor {
return pipe.procs
}

// Graph gets the processor graph interface
func (pipe *DefaultPipeline) Graph() gdbi.GraphInterface {
return pipe.graph
}

// DefaultCompiler is the core compiler that works with default graph interface
type DefaultCompiler struct {
db gdbi.GraphInterface
Expand Down Expand Up @@ -80,7 +86,7 @@ func (comp DefaultCompiler) Compile(stmts []*gripql.GraphStatement, opts *gdbi.C
procs = append(procs, p)
}

return &DefaultPipeline{procs, ps.LastType, ps.MarkTypes}, nil
return &DefaultPipeline{comp.db, procs, ps.LastType, ps.MarkTypes}, nil
}

func StatementProcessor(gs *gripql.GraphStatement, db gdbi.GraphInterface, ps *pipeline.State) (gdbi.Processor, error) {
Expand Down
Loading