diff options
author | axtloss <axtlos@getcryst.al> | 2024-02-12 22:45:56 +0100 |
---|---|---|
committer | axtloss <axtlos@getcryst.al> | 2024-02-12 22:45:56 +0100 |
commit | 2324675a9c58629e321e245a7f436a90cbc98c3d (patch) | |
tree | 08a253380fdfa9d0e1a6bd82361305001e4b7d4a | |
parent | e01c0cf2c84c41841bd7f080a2136c67cf5425b9 (diff) | |
download | fsverify-2324675a9c58629e321e245a7f436a90cbc98c3d.tar.gz fsverify-2324675a9c58629e321e245a7f436a90cbc98c3d.tar.bz2 |
Optimize database writes
-rw-r--r-- | verifysetup/cmd/setup.go | 96 | ||||
-rw-r--r-- | verifysetup/core/storage.go | 48 |
2 files changed, 62 insertions, 82 deletions
diff --git a/verifysetup/cmd/setup.go b/verifysetup/cmd/setup.go index 79059f1..149f41a 100644 --- a/verifysetup/cmd/setup.go +++ b/verifysetup/cmd/setup.go @@ -11,6 +11,7 @@ import ( verify "github.com/axtloss/fsverify/core" "github.com/axtloss/fsverify/verifysetup/core" "github.com/spf13/cobra" + bolt "go.etcd.io/bbolt" ) func NewSetupCommand() *cobra.Command { @@ -24,41 +25,42 @@ func NewSetupCommand() *cobra.Command { return cmd } -func checksumBlock(blockStart int, blockEnd int, blockCount int, diskBytes []byte, nodeChannel chan verify.Node, waitGroup *sync.WaitGroup) { +func checksumBlock(blockStart int, blockEnd int, bundleSize int, diskBytes []byte, nodeChannel chan verify.Node, n int, waitGroup *sync.WaitGroup) { defer waitGroup.Done() + defer close(nodeChannel) var reader *bytes.Reader node := verify.Node{} - //fmt.Printf("Starting from %d to %d. BlockCount is %d\n", blockStart, blockEnd, blockCount) - //fmt.Println(blockCount) - //fmt.Println("diskBytes: ") - //fmt.Printf("Addres of diskBytes: %d\n", &diskBytes) - //fmt.Printf("%d:: diskByteslen: %d\n", blockStart, len(diskBytes)) - for i := 0; i < int(blockCount)-1; i++ { + + blockCount := math.Floor(float64(bundleSize / 2000)) + + for i := 0; i < int(blockCount); i++ { reader = bytes.NewReader(diskBytes) block, err := core.ReadBlock(i*2000, (i*2000)+2000, reader) if err != nil { fmt.Printf("%d:: %d attempted reading from %d to %d. Error %s\n", blockStart, i, i*2000, (i*2000)+2000, err) - //fmt.Println(err) return } - node, err = core.CreateNode(i*2000, (i*2000)+2000, block, &node) + node, err = core.CreateNode(blockStart+i*2000, blockStart+(i*2000)+2000, block, &node, strconv.Itoa(n)) if err != nil { fmt.Printf("%d:: 2 Error %s\n", blockStart, err) - //fmt.Println(err) return } - //nodeChannel <- node - //fmt.Println(blockStart, ":: ", node) - //fmt.Printf("%d:: %d\n", blockStart, i) + nodeChannel <- node + } + + block, err := core.ReadBlock(int(blockCount*2000), len(diskBytes), reader) + if err != nil { + fmt.Printf("%d:: final attempted reading from %d to %d. Error %s\n", blockStart, int(blockCount*2000)+2000, len(diskBytes), err) + return } + finalNode, err := core.CreateNode(blockStart+int(blockCount*2000)+2000, len(diskBytes), block, &node, strconv.Itoa(n)) + nodeChannel <- finalNode fmt.Printf("Node from %d to %d finished.\n", blockStart, blockEnd) } func copyByteArea(start int, end int, reader *bytes.Reader) ([]byte, error) { bytes := make([]byte, end-start) - //reader.Seek(int64(start), 0) n, err := reader.ReadAt(bytes, int64(start)) - //fmt.Printf("Reading from %d to %d\n", start, end) if err != nil { return nil, err } else if n != end-start { @@ -87,64 +89,52 @@ func SetupCommand(_ *cobra.Command, args []string) error { return err } diskSize := diskInfo.Size() - blockCount := math.Floor(float64(diskSize / 2000)) - lastBlockSize := float64(diskSize) - blockCount*2000.0 - blockBundle := math.Floor(float64(blockCount / float64(procCount))) - // lastBlockBundle := float64(blockCount) - blockBundle*float64(procCount) + bundleSize := math.Floor(float64(diskSize / int64(procCount))) + blockCount := math.Ceil(float64(bundleSize / 2000)) + lastBlockSize := int(diskSize) - int(bundleSize)*procCount fmt.Println(diskSize) - fmt.Println(blockCount) + fmt.Println(int(bundleSize)) fmt.Println(lastBlockSize) - // node := verify.Node{} - // block := make([]byte, 2000) diskBytes := make([]byte, diskSize) _, err = disk.Read(diskBytes) if err != nil { return err } reader := bytes.NewReader(diskBytes) - nodeChannel := make(chan verify.Node) var waitGroup sync.WaitGroup - //var nodes []verify.Node + nodeChannels := make([]chan verify.Node, procCount+1) for i := 0; i < procCount; i++ { - /*reader = bytes.NewReader(diskBytes) - block, err = core.ReadBlock(i*2000, (i*2000)+2000, reader) - if err != nil { - return err - } - node, err = core.CreateNode(i*2000, (i*2000)+2000, block, &node) - if err != nil { - return err - } - fmt.Println(node) - err = core.AddNode(node, nil, "./fsverify.db")*/ - - diskBytesCopy, err := copyByteArea(i*(int(blockBundle)*2000), (i+1)*(int(blockBundle)*2000), reader) + diskBytesCopy, err := copyByteArea(i*(int(bundleSize)), (i+1)*(int(bundleSize)), reader) if err != nil { return err } waitGroup.Add(1) - fmt.Printf("Starting thread %d with blockStart %d and blockEnd %d\n", i, i*(int(blockBundle)*2000), (i+1)*(int(blockBundle)*2000)) - go checksumBlock(i*(int(blockBundle)*2000), (i+1)*(int(blockBundle)*2000), int(blockBundle), diskBytesCopy, nodeChannel, &waitGroup) + fmt.Printf("Starting thread %d with blockStart %d and blockEnd %d\n", i, i*(int(bundleSize)), (i+1)*(int(bundleSize))) + nodeChannels[i] = make(chan verify.Node, int(math.Ceil(bundleSize/2000))) + go checksumBlock(i*(int(bundleSize)), (i+1)*(int(bundleSize)), int(bundleSize), diskBytesCopy, nodeChannels[i], i, &waitGroup) } - //fmt.Println("Appending nodes") - /*for i := 0; i < procCount; i++ { - nodes = append(nodes, <-nodeChannel) - }*/ + waitGroup.Wait() - fmt.Println("Created nodelist") - /*finalBlock, err := core.ReadBlock(int(blockCount*2000), int((blockCount*2000)+lastBlockSize), reader) + db, err := verify.OpenDB("./fsverify.db", false) if err != nil { return err } - finalNode, err := core.CreateNode(int(blockCount*2000), int((blockCount*2000)+lastBlockSize), finalBlock, &node) - if err != nil { - return err + + for i := 0; i < procCount; i++ { + channel := nodeChannels[i] + err = db.Batch(func(tx *bolt.Tx) error { + for j := 0; j < int(blockCount); j++ { + err := core.AddNode(<-channel, tx) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } } - fmt.Println(finalNode) - err = core.AddNode(finalNode, nil, "./fsverify.db") - if err != nil { - return err - }*/ signature, err := core.SignDatabase("./fsverify.db", "./minisign/") if err != nil { diff --git a/verifysetup/core/storage.go b/verifysetup/core/storage.go index 831d637..56d32af 100644 --- a/verifysetup/core/storage.go +++ b/verifysetup/core/storage.go @@ -11,6 +11,11 @@ import ( var TotalReadBlocks = 0 func ReadBlock(start int, end int, device *bytes.Reader) ([]byte, error) { + if end-start < 0 { + return []byte{}, fmt.Errorf("ERROR: tried creating byte slice with negative length. %d to %d total %d\n", start, end, end-start) + } else if end-start > 2000 { + return []byte{}, fmt.Errorf("ERROR: tried creating byte slice with length over 2000. %d to %d total %d\n", start, end, end-start) + } block := make([]byte, end-start) _, err := device.Seek(int64(start), 0) if err != nil { @@ -21,7 +26,7 @@ func ReadBlock(start int, end int, device *bytes.Reader) ([]byte, error) { return block, err } -func CreateNode(blockStart int, blockEnd int, block []byte, prevNode *verify.Node) (verify.Node, error) { +func CreateNode(blockStart int, blockEnd int, block []byte, prevNode *verify.Node, n string) (verify.Node, error) { node := verify.Node{} node.BlockStart = blockStart node.BlockEnd = blockEnd @@ -37,39 +42,24 @@ func CreateNode(blockStart int, blockEnd int, block []byte, prevNode *verify.Nod return verify.Node{}, err } } else { - prevNodeHash = "Entrypoint" + prevNodeHash = "Entrypoint" + n } node.PrevNodeSum = prevNodeHash return node, nil } -func AddNode(node verify.Node, db *bolt.DB, dbPath string) error { - var err error - var deferDB bool - if db == nil { - db, err = bolt.Open(dbPath, 0777, nil) - if err != nil { - return err - } - deferDB = true - } else if db.IsReadOnly() { - return fmt.Errorf("Error: database is opened read only, unable to add nodes") - } - err = db.Update(func(tx *bolt.Tx) error { - nodes, err := tx.CreateBucketIfNotExists([]byte("Nodes")) - if err != nil { - return err - } - if buf, err := json.Marshal(node); err != nil { - return err - } else if err := nodes.Put([]byte(node.PrevNodeSum), buf); err != nil { - return err - } +func AddNode(node verify.Node, tx *bolt.Tx) error { + if node.BlockStart == node.BlockEnd { return nil - - }) - if deferDB { - defer db.Close() } - return err + nodes, err := tx.CreateBucketIfNotExists([]byte("Nodes")) + if err != nil { + return err + } + if buf, err := json.Marshal(node); err != nil { + return err + } else if err := nodes.Put([]byte(node.PrevNodeSum), buf); err != nil { + return err + } + return nil } |