From f9846a94f41c57c27802e65e48b9bc9d1ae0a2d0 Mon Sep 17 00:00:00 2001 From: Mustafa Bayar Date: Thu, 6 Nov 2025 13:05:22 +0100 Subject: [PATCH] partition: Partition ID migration Currently partition's are addressed using uin64 IDs. When RAFT replicas are used, each replica will keep an internal ID and another string based key constructed from storageName + partitionID to route to correct internal partition IDs. This will complicate the certain flows where we will need to be aware of RAFT status to refer to correct ID format. Instead moving to the composite ID format by default will eliminate the need for separate routing table for RAFT. For this refactoring, we need two things: 1- Changing all the reference to old uint64 format to be string 2- Adding migration to format existing structure and DB keys In this commit, we are first introducing the migration without the actual partition ID refactoring. By doing so, we are ensuring backward compatibility: Phase 1 (Current): Deploy migration code in disabled state - Introduces forward/backward migration logic without activating it - Establishes rollback safety net Phase 2 (Next MR): Enable migration to transform partition ID structure - Introduce the actual partition ID refactoring - Set flag to true to activate the new partition ID structure - Migration runs automatically on deployment Rollback Safety: If issues arise after Phase 2, reverting to Phase 1 will trigger backward migration to restore original structure. --- internal/cli/gitaly/serve.go | 90 ++- .../partition/partition_id_migration.go | 605 ++++++++++++++++++ .../partition/partition_id_migration_test.go | 434 +++++++++++++ 3 files changed, 1128 insertions(+), 1 deletion(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/partition_id_migration.go create mode 100644 internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0531be28fa4..25dc8de1ec0 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -497,7 +497,7 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { node = raftNode } - if err := replicaPartitionMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { + if err := partitionIDMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { return err } @@ -868,3 +868,91 @@ func replicaPartitionMigration(cfg config.Cfg, locator storage.Locator, dbMgr *d } return nil } + +// partitionIDMigration performs partition ID migration for all storages. +func partitionIDMigration(cfg config.Cfg, locator storage.Locator, dbMgr *databasemgr.DBManager, logger log.Logger, assignmentWorkerErrCh <-chan error) error { + for _, storageCfg := range cfg.Storages { + partitionsDir, err := locator.PartitionsDir(storageCfg.Name) + if err != nil { + return fmt.Errorf("retrieving partitions dir for storage %s: %w", storageCfg.Name, err) + } + + db, err := dbMgr.GetDB(storageCfg.Name) + if err != nil { + return fmt.Errorf("getting db: %w", err) + } + + migrator, err := partition.NewIDMigrator(partitionsDir, storageCfg.Name, db) + if err != nil { + return fmt.Errorf("creating partition ID migrator: %w", err) + } + + migrated, err := migrator.CheckMigrationStatus() + if err != nil { + return fmt.Errorf("partition ID migrator status check: %w", err) + } + + var shouldMigrate bool + var migrate func() error + var actionDesc, completionDesc string + + // Unlike RAFT migration, We don't need this to be controlled by config, + // because this is a structural change transparent to users. + // + // It is initially hardcoded to false for controlled rollout. + // This two-phase deployment strategy ensures backward compatibility: + // + // Phase 1 (Current): Deploy migration code in disabled state + // - Introduces forward/backward migration logic without activating it + // - Establishes rollback safety net + // + // Phase 2 (Next MR): Enable migration to transform partition ID structure + // - Set flag to true to activate the new partition ID structure + // - Migration runs automatically on deployment + // + // Rollback Safety: If issues arise after Phase 2, reverting to Phase 1 code + // will trigger backward migration to restore original partition structure. + partitionMigrationEnabled := false + if partitionMigrationEnabled { + shouldMigrate = !migrated + migrate = migrator.Forward + actionDesc = "starting partition ID migration" + completionDesc = "completed partition ID migration" + } else { + shouldMigrate = migrated + migrate = migrator.Backward + actionDesc = "starting partition ID migration rollback" + completionDesc = "completed rollback partition ID migration" + } + + if shouldMigrate { + // Block and wait for assignment worker to complete before migration + if err := <-assignmentWorkerErrCh; err != nil { + return fmt.Errorf("partition assignment worker: %w", err) + } + + logger.Info(fmt.Sprintf("%s for storage: %s", actionDesc, storageCfg.Name)) + startTime := time.Now() + + if err := migrate(); err != nil { + if partitionMigrationEnabled { + return fmt.Errorf("performing partition ID migration: %w", err) + } + return fmt.Errorf("undoing partition ID migration: %w", err) + } + + logger.Info(fmt.Sprintf("%s for storage %s in %v", completionDesc, storageCfg.Name, time.Since(startTime))) + } + + if !partitionMigrationEnabled && !shouldMigrate { + // Partition ID migration is neither enabled, nor previously run (doesn't require rollback). + // Fallback to replicaPartitionMigration to not block RAFT testing. + // Once we enable partitionIDMigration, we can phase out replicaPartitionMigration entirely, + // as partitionIDMigration encompasses the essential functions of replicaPartitionMigration. + if err := replicaPartitionMigration(cfg, locator, dbMgr, logger, assignmentWorkerErrCh); err != nil { + return err + } + } + } + return nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go b/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go new file mode 100644 index 00000000000..397afd79a5b --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/partition_id_migration.go @@ -0,0 +1,605 @@ +package partition + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v18/internal/safe" +) + +// db key to track partition ID format migration +var partitionIDFormatMigrationKey = []byte("partition_id_format_migration_status") + +// IDMigrator handles migrations between partition structures and partition ID formats +type IDMigrator struct { + storageName string + partitionsDir string + db keyvalue.Store +} + +// NewIDMigrator creates a new partition ID migrator instance +func NewIDMigrator(absoluteStateDir, storageName string, db keyvalue.Store) (*IDMigrator, error) { + partitionsDir, err := getPartitionsDir(absoluteStateDir) + if err != nil { + return nil, fmt.Errorf("determining partitions directory: %w", err) + } + + return &IDMigrator{ + storageName: storageName, + partitionsDir: partitionsDir, + db: db, + }, nil +} + +// Forward migrates from the old to new partition structure and partition ID format +func (m *IDMigrator) Forward() error { + // Then, migrate partition structure for raft replica model + if err := m.partitionIDMigration(); err != nil { + if backwardErr := m.Backward(); backwardErr != nil { + return fmt.Errorf("partition restructure migration failed: %w, and reversion also failed: %w", err, backwardErr) + } + return fmt.Errorf("partition restructure migration: %w", err) + } + + if err := cleanupOldPartitionStructure(m.partitionsDir); err != nil { + return fmt.Errorf("cleanup old partition structure: %w", err) + } + + if err := m.updateMigrationInDB(); err != nil { + return fmt.Errorf("update migration status: %w", err) + } + + return nil +} + +// Backward handles the reverse migration to restore the old structure +// from the new one. +// Note: This assumes that the new structure is correctly set up and working. +func (m *IDMigrator) Backward() error { + // First, undo partition structure migration + if err := m.undoPartitionRestructureMigration(); err != nil { + return fmt.Errorf("undoing partition restructure: %w", err) + } + + if err := cleanupNewPartitionStructure(m.partitionsDir); err != nil { + return fmt.Errorf("cleanup new partition structure: %w", err) + } + + if err := m.deleteMigrationInDB(); err != nil { + return fmt.Errorf("delete migration status: %w", err) + } + + return nil +} + +// BEFORE MIGRATION: +// +// ── partitions +// ├── 59 # First two chars of hash(partitionID) +// │ └── 94 # Next two chars of hash(partitionID) +// │ └── 12345 # Numeric partitionID +// │ └── wal # Write-ahead log directory +// │ ├── 0000000000000001 # Log sequence number +// │ │ ├── MANIFEST +// │ │ └── RAFT +// │ └── 0000000000000002 +// │ ├── MANIFEST +// │ └── RAFT +// +// AFTER MIGRATION: +// +// ── partitions +// ├── 59 +// │ └── 94 +// │ └── 12345 +// │ └── wal +// │ ├── 0000000000000001 +// │ │ ├── MANIFEST +// │ │ └── RAFT +// │ └── 0000000000000002 +// │ ├── MANIFEST +// │ └── RAFT +// └── a8 +// └── 42 +// └── testStorage_12345 +// └── wal +// ├── 0000000000000001 +// │ ├── MANIFEST +// │ └── RAFT +// └── 0000000000000002 +// ├── MANIFEST +// └── RAFT +// +// partitionIDMigration restructures partitions from the old directory structure +// to correspond to new partition ID format. It also updates the badger DB keys. +func (m *IDMigrator) partitionIDMigration() error { + // Track all directories that need to be synced + dirsToSync := make(map[string]struct{}) + + err := filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Get relative path from state directory + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + matches := pathPattern.FindStringSubmatch(relPath) + if len(matches) == 0 { + // Path doesn't match our pattern, skip it + return nil + } + // It matched, third capture group will be partitionID + partitionID := matches[3] + _, newWalDir := pathForMigratedDir(m.storageName, m.partitionsDir, partitionID) + + // Add dir to be synced + dirsToSync[newWalDir] = struct{}{} + + // For files and directories beyond the /wal level + // Get components after /wal by removing the matched prefix + subPath := strings.TrimPrefix(relPath, matches[0]) + // Remove leading separator if present + subPath = strings.TrimPrefix(subPath, string(os.PathSeparator)) + newPath := filepath.Join(newWalDir, subPath) + if info.IsDir() { + if err := os.MkdirAll(newPath, info.Mode().Perm()); err != nil { + return fmt.Errorf("failed to create directory %s: %w", newPath, err) + } + } else if info.Mode().IsRegular() { + if err := os.Link(path, newPath); err != nil { + return fmt.Errorf("failed to hardlink file from %s to %s: %w", path, newPath, err) + } + } + + return nil + }) + if err != nil { + return err + } + + syncer := safe.NewSyncer() + for dir := range dirsToSync { + if err := syncer.SyncRecursive(context.Background(), dir); err != nil { + return fmt.Errorf("syncing new replica structure: %w", err) + } + } + + // Migrate badger DB keys from uint64 to string format + if err := m.migrateBadgerDBKeys(); err != nil { + return fmt.Errorf("migrate badger DB keys: %w", err) + } + + return nil +} + +// BEFORE MIGRATION: +// +// ── partitions +// └── a8 +// └── 42 +// └── testStorage_12345 +// └── wal +// └── 0000000000000001 +// ├── MANIFEST +// └── RAFT +// +// AFTER MIGRATION: +// +// ── partitions +// ├── 59 +// │ └── 94 +// │ └── 12345 +// │ └── wal +// │ └── 0000000000000001 +// │ ├── MANIFEST +// │ └── RAFT +// └── a8 +// └── 42 +// └── testStorage_12345 +// └── wal +// └── 0000000000000001 +// ├── MANIFEST +// └── RAFT +// +// undoPartitionRestructureMigration reverses the partition migration by creating hardlinks +// from the new structure back to the old structure. This is the opposite of PartitionRestructureMigration. +func (m *IDMigrator) undoPartitionRestructureMigration() error { + // Track directories that need to be synced + dirsToSync := make(map[string]struct{}) + + err := filepath.Walk(m.partitionsDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + if os.IsNotExist(err) { + return os.MkdirAll(path, info.Mode().Perm()) + } + return err + } + + // Skip the base path itself + if path == m.partitionsDir { + return nil + } + + // Get relative path from partitionsDir + relPath, err := filepath.Rel(m.partitionsDir, path) + if err != nil { + return err + } + + // Check if this is a WAL directory in the new structure + matches := replicaPartitionPattern.FindStringSubmatch(relPath) + if len(matches) == 0 { + return nil // Skip if not matching the expected pattern + } + + // Extract components from the matches + partitionID := matches[4] + oldPartition := storage.ComputePartition(partitionID) + oldWalPath := filepath.Join(m.partitionsDir, oldPartition) + + // Add the old WAL path to directories to sync + dirsToSync[oldWalPath] = struct{}{} + + // Use filepath.Walk again to process all subdirectories and files in the WAL directory + return filepath.Walk(path, func(subPath string, subInfo fs.FileInfo, err error) error { + if err != nil { + return err + } + + // Skip the WAL directory itself as we've already created it + if subPath == path { + return nil + } + + // Get the relative path from the new WAL directory + relSubPath, err := filepath.Rel(path, subPath) + if err != nil { + return fmt.Errorf("failed to get relative path for %s: %w", subPath, err) + } + + // Create the corresponding path in the old structure + oldSubPath := filepath.Join(oldWalPath, relSubPath) + + if subInfo.IsDir() { + // Create directory with same permissions + if err := os.MkdirAll(oldSubPath, subInfo.Mode().Perm()); err != nil { + return fmt.Errorf("failed to create directory %s: %w", oldSubPath, err) + } + } else if subInfo.Mode().IsRegular() { + // Create hardlink for the file + if err := os.Link(subPath, oldSubPath); err != nil { + return fmt.Errorf("failed to create hardlink from %s to %s: %w", subPath, oldSubPath, err) + } + } + + return nil + }) + }) + if err != nil { + return err + } + + // Sync all directories at once after all files have been created + syncer := safe.NewSyncer() + for dir := range dirsToSync { + if err := syncer.SyncRecursive(context.Background(), dir); err != nil { + return fmt.Errorf("syncing old replica structure: %w", err) + } + } + + // Undo badger DB key migration + if err := m.undoBadgerDBKeyMigration(); err != nil { + return fmt.Errorf("undo badger DB key migration: %w", err) + } + + return nil +} + +func (m *IDMigrator) updateMigrationInDB() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // just set any value as the presence of a key is sufficient + if err := txn.Set(partitionIDFormatMigrationKey, []byte(nil)); err != nil { + return fmt.Errorf("set entry: %w", err) + } + + return nil + }) +} + +// CheckMigrationStatus is used to validate whether the entire migration was complete +func (m *IDMigrator) CheckMigrationStatus() (bool, error) { + var migrated bool + err := m.db.View(func(txn keyvalue.ReadWriter) error { + _, err := txn.Get(partitionIDFormatMigrationKey) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + migrated = false + return nil + } + return fmt.Errorf("get: %w", err) + } + migrated = true + return nil + }) + + return migrated, err +} + +func (m *IDMigrator) deleteMigrationInDB() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + if err := txn.Delete(partitionIDFormatMigrationKey); err != nil { + return fmt.Errorf("delete partition ID format migration key: %w", err) + } + + return nil + }) +} + +// migrateBadgerDBKeys migrates all badger DB keys from uint64 to string format +func (m *IDMigrator) migrateBadgerDBKeys() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // Create iterator to scan all partition-related keys + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("p/"), // PrefixPartition + }) + defer iter.Close() + + var keysToMigrate []struct { + oldKey []byte + newKey []byte + value []byte + } + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + oldKey := item.Key() + + // Extract the partition ID from the old key + unprefixedKey := oldKey[2:] // Remove "p/" prefix + + // Partition IDs in old format are always 8 bytes + if len(unprefixedKey) < 8 { + continue // Skip malformed keys + } + + partitionIDBytes := unprefixedKey[:8] + keySuffix := unprefixedKey[8:] // Everything after the 8-byte partition ID + + // Parse as uint64 (old format) + val := binary.BigEndian.Uint64(partitionIDBytes) + newPartitionID := storage.GetRaftPartitionName(m.storageName, strconv.FormatUint(val, 10)) + + // Create new key with string format + newKey := append([]byte("p/"), append([]byte(newPartitionID), keySuffix...)...) + + // Get the value + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for key %q: %w", oldKey, err) + } + + keysToMigrate = append(keysToMigrate, struct { + oldKey []byte + newKey []byte + value []byte + }{ + oldKey: append([]byte(nil), oldKey...), + newKey: newKey, + value: value, + }) + } + + // Migrate the keys + for _, keyMigration := range keysToMigrate { + // Set the new key + if err := txn.Set(keyMigration.newKey, keyMigration.value); err != nil { + return fmt.Errorf("set new key %q: %w", keyMigration.newKey, err) + } + + // Delete the old key + if err := txn.Delete(keyMigration.oldKey); err != nil { + return fmt.Errorf("delete old key %q: %w", keyMigration.oldKey, err) + } + } + + // Also handle partition assignments + assignmentIter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("partition_assignment/"), + }) + defer assignmentIter.Close() + + var assignmentsToUpdate []struct { + key []byte + value []byte + } + + for assignmentIter.Rewind(); assignmentIter.Valid(); assignmentIter.Next() { + item := assignmentIter.Item() + key := item.Key() + + oldValue, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy assignment value for key %q: %w", key, err) + } + + // If the value is 8 bytes, it's likely the old uint64 format + if len(oldValue) == 8 { + val := binary.BigEndian.Uint64(oldValue) + newPartitionID := storage.GetRaftPartitionName(m.storageName, strconv.FormatUint(val, 10)) + newValue := []byte(newPartitionID) // New string format + + assignmentsToUpdate = append(assignmentsToUpdate, struct { + key []byte + value []byte + }{ + key: append([]byte(nil), key...), + value: newValue, + }) + } + } + + // Now update all assignments after iteration is complete + for _, assignment := range assignmentsToUpdate { + if err := txn.Set(assignment.key, assignment.value); err != nil { + return fmt.Errorf("update assignment value for key %q: %w", assignment.key, err) + } + } + + return nil + }) +} + +// undoBadgerDBKeyMigration reverses the badger DB key migration from string back to uint64 format +func (m *IDMigrator) undoBadgerDBKeyMigration() error { + return m.db.Update(func(txn keyvalue.ReadWriter) error { + // Create iterator to scan all partition-related keys + iter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("p/"), // PrefixPartition + }) + defer iter.Close() + + // Collect keys to migrate (can't modify while iterating) + var keysToMigrate []struct { + oldKey []byte + newKey []byte + value []byte + } + + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + oldKey := item.Key() + + // Extract the partition ID from the key + unprefixedKey := oldKey[2:] // Remove "p/" prefix + + // Find the first '/' to separate partition ID from the rest + slashIndex := bytes.IndexByte(unprefixedKey, '/') + if slashIndex == -1 { + continue // Skip malformed keys + } + + partitionIDBytes := unprefixedKey[:slashIndex] + keySuffix := unprefixedKey[slashIndex:] + + // Try to parse as string (new format) and convert back to uint64 + partitionIDStr := string(partitionIDBytes) + + // Extract numeric part from storage_name_partitionID format + prefix := m.storageName + "_" + if strings.HasPrefix(partitionIDStr, prefix) { + numericPart := strings.TrimPrefix(partitionIDStr, prefix) + if val, parseErr := strconv.ParseUint(numericPart, 10, 64); parseErr == nil { + // Create legacy uint64 key + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + newKey := make([]byte, 0, 2+8+len(keySuffix)) + newKey = append(newKey, []byte("p/")...) + newKey = append(newKey, marshaled...) + newKey = append(newKey, keySuffix...) + + // Get the value + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy value for key %q: %w", oldKey, err) + } + + keysToMigrate = append(keysToMigrate, struct { + oldKey []byte + newKey []byte + value []byte + }{ + oldKey: append([]byte(nil), oldKey...), + newKey: newKey, + value: value, + }) + } + } + } + // Migrate the keys + for _, keyMigration := range keysToMigrate { + // Set the new key + if err := txn.Set(keyMigration.newKey, keyMigration.value); err != nil { + return fmt.Errorf("set legacy key %q: %w", keyMigration.newKey, err) + } + + // Delete the old key + if err := txn.Delete(keyMigration.oldKey); err != nil { + return fmt.Errorf("delete new key %q: %w", keyMigration.oldKey, err) + } + } + + // Reverse migrate partition assignment values from string back to uint64 format + assignmentIter := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("partition_assignment/"), + }) + defer assignmentIter.Close() + + var assignmentsToUpdate []struct { + key []byte + value []byte + } + + for assignmentIter.Rewind(); assignmentIter.Valid(); assignmentIter.Next() { + item := assignmentIter.Item() + key := item.Key() + + oldValue, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copy assignment value for key %q: %w", key, err) + } + + // Try to parse the value as a string partition ID and convert back to uint64 + partitionIDStr := string(oldValue) + + // Extract numeric part from storage_name_partitionID format + prefix := m.storageName + "_" + if strings.HasPrefix(partitionIDStr, prefix) { + numericPart := strings.TrimPrefix(partitionIDStr, prefix) + if val, parseErr := strconv.ParseUint(numericPart, 10, 64); parseErr == nil { + // Convert back to uint64 binary format + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + assignmentsToUpdate = append(assignmentsToUpdate, struct { + key []byte + value []byte + }{ + key: append([]byte(nil), key...), + value: marshaled, + }) + } + } + } + // Now update all assignments after iteration is complete + for _, assignment := range assignmentsToUpdate { + if err := txn.Set(assignment.key, assignment.value); err != nil { + return fmt.Errorf("update assignment value for key %q: %w", assignment.key, err) + } + } + + return nil + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go b/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go new file mode 100644 index 00000000000..7d56406ce4b --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/partition_id_migration_test.go @@ -0,0 +1,434 @@ +package partition + +import ( + "encoding/binary" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" +) + +func TestPartitionIDMigrator_Forward(t *testing.T) { + t.Parallel() + + t.Run("successful migration", func(t *testing.T) { + t.Parallel() + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup old format DB keys for both partitions + setupOldFormatDBKeys(t, db, "123") + setupOldFormatDBKeys(t, db, "456") + + migrator, err := NewIDMigrator(partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + + // Setup old partition structure using the helper + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/123/wal/0000000000000001/RAFT": false, + "zz/yy/456/wal/0000000000000001/RAFT": false, + }) + + // Run the migration + require.NoError(t, migrator.Forward()) + + // Verify DB keys were migrated for both partitions + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + verifyNewFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + // Verify old structure is gone, new structure should exist + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, // parent of partition 123 still remains + "/zz": {Mode: mode.Directory}, + "/zz/yy": {Mode: mode.Directory}, // parent of partition 456 still remains + "/43": {Mode: mode.Directory}, + "/43/02": {Mode: mode.Directory}, + "/43/02/testStorage_456": {Mode: mode.Directory}, + "/43/02/testStorage_456/wal": {Mode: mode.Directory}, + "/43/02/testStorage_456/wal/0000000000000001": {Mode: mode.Directory}, + "/43/02/testStorage_456/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/testStorage_123": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal/0000000000000001": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) + + t.Run("can be run multiple times", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup old format DB keys + setupOldFormatDBKeys(t, db, "123") + + migrator, err := NewIDMigrator(partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + + // Setup structure with read-only directory to cause cleanup error + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/123/wal/0000000000000001": true, // partition directory + }) + + // Make the directory read-only on the parent to cause error during cleanup + oldPartitionPath := filepath.Join(partitionsDir, "xx/yy") + err = os.Chmod(oldPartitionPath, 0o500) // read-only + require.NoError(t, err) + + // Run the migration - should complete the migration but fail during cleanup + require.Error(t, migrator.Forward()) + + // Restore permissions for cleanup + assert.NoError(t, os.Chmod(oldPartitionPath, mode.Directory)) + + // Since cleanup failed, old structure still exists + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/xx/yy/123": {Mode: mode.Directory}, // expected to not be cleaned up + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/testStorage_123": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal/0000000000000001": {Mode: mode.Directory}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + migrated, err := migrator.CheckMigrationStatus() + require.NoError(t, err) + require.False(t, migrated) + require.NoError(t, migrator.Forward()) + + // /xx/yy/123 got cleaned up + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/7e": {Mode: mode.Directory}, + "/7e/8d": {Mode: mode.Directory}, + "/7e/8d/testStorage_123": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal": {Mode: mode.Directory}, + "/7e/8d/testStorage_123/wal/0000000000000001": {Mode: mode.Directory}, + }) + assertPartitionIDMigrationKeyExists(t, migrator.db) + migrated, err = migrator.CheckMigrationStatus() + require.NoError(t, err) + require.True(t, migrated) + }) +} + +func TestPartitionIDMigrator_Backward(t *testing.T) { + t.Parallel() + + t.Run("successful migration", func(t *testing.T) { + t.Parallel() + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup new format DB keys for both partitions + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + migrator, err := NewIDMigrator(partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + // Setup new partition structure using the helper + setupDirectory(t, partitionsDir, map[string]bool{ + "xx/yy/testStorage_123/wal/0000000000000001/RAFT": false, + "zz/yy/testStorage_456/wal/0000000000000001/RAFT": false, + }) + + // Run the migration + err = migrator.Backward() + require.NoError(t, err) + + // Verify DB keys were reverted for both partitions + verifyOldFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + verifyOldFormatDBKeys(t, db, cfg.Storages[0].Name, "456") + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/xx": {Mode: mode.Directory}, + "/xx/yy": {Mode: mode.Directory}, + "/zz": {Mode: mode.Directory}, + "/zz/yy": {Mode: mode.Directory}, + "/b3": {Mode: mode.Directory}, + "/b3/a8": {Mode: mode.Directory}, + "/b3/a8/456": {Mode: mode.Directory}, + "/b3/a8/456/wal": {Mode: mode.Directory}, + "/b3/a8/456/wal/0000000000000001": {Mode: mode.Directory}, + "/b3/a8/456/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + }) + + t.Run("can be run multiple times", func(t *testing.T) { + t.Parallel() + + // Create a new temp directory for this test + tempDir := testhelper.TempDir(t) + partitionsDir := filepath.Join(tempDir, "partitions") + cfg := testcfg.Build(t, testcfg.WithStorages("testStorage")) + logger := testhelper.SharedLogger(t) + ctx := testhelper.Context(t) + + // Create partitions directory + err := os.MkdirAll(filepath.Join(tempDir, "partitions"), mode.Directory) + require.NoError(t, err, "Failed to create partitions directory") + + _, db := getTestDBManager(t, ctx, cfg, logger) + + // Setup new format DB keys + setupNewFormatDBKeys(t, db, cfg.Storages[0].Name, "123") + + migrator, err := NewIDMigrator(partitionsDir, cfg.Storages[0].Name, db) + require.NoError(t, err) + // Setup structure with read-only directory to cause cleanup error + structure := map[string]bool{ + "qq/yy/testStorage_123/wal/0000000000000001/RAFT": false, // partition directory + } + + // Creates all the parent dirs in the path specified + setupDirectory(t, partitionsDir, structure) + + // Make the directory read-only on the parent to cause error during cleanup + newPartitionPath := filepath.Join(partitionsDir, "qq/yy") + require.NoError(t, os.Chmod(newPartitionPath, 0o500)) // read-only + + // Run the migration - should complete the migration but fail during cleanup + require.Error(t, migrator.Backward()) + + _, err = os.Stat(filepath.Join(partitionsDir, "qq/yy/testStorage_123")) + assert.NoError(t, err, "Dir should exist as it didn't get cleaned up") + + // Restore permissions for cleanup + assert.NoError(t, os.Chmod(newPartitionPath, mode.Directory)) + require.NoError(t, migrator.Backward()) + + testhelper.RequireDirectoryState(t, partitionsDir, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/qq": {Mode: mode.Directory}, + "/qq/yy": {Mode: mode.Directory}, + "/a6": {Mode: mode.Directory}, + "/a6/65": {Mode: mode.Directory}, + "/a6/65/123": {Mode: mode.Directory}, + "/a6/65/123/wal": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001": {Mode: mode.Directory}, + "/a6/65/123/wal/0000000000000001/RAFT": {Mode: mode.File, Content: content}, + }) + assertPartitionIDMigrationKeyAbsent(t, migrator.db) + }) +} + +// assertPartitionIDMigrationKeyExists checks if the partition ID format migration key exists in the database +func assertPartitionIDMigrationKeyExists(t *testing.T, db keyvalue.Store) { + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + item, err := txn.Get(partitionIDFormatMigrationKey) + require.NoError(t, err) + require.NotNil(t, item) // Ensure we got a valid item + return nil + })) +} + +// assertPartitionIDMigrationKeyAbsent checks if the partition ID format migration key is absent from the database +func assertPartitionIDMigrationKeyAbsent(t *testing.T, db keyvalue.Store) { + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + _, getErr := txn.Get(partitionIDFormatMigrationKey) + require.ErrorIs(t, getErr, badger.ErrKeyNotFound) + return nil + })) +} + +// setupOldFormatDBKeys creates DB keys in the old uint64 format for testing +func setupOldFormatDBKeys(t *testing.T, db keyvalue.Store, partitionID string) { + t.Helper() + + require.NoError(t, db.Update(func(txn keyvalue.ReadWriter) error { + // Create old format partition key (uint64) + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + + // Set a partition key with old format + oldPartitionKey := make([]byte, 0, 2+8+len("/test_key")) + oldPartitionKey = append(oldPartitionKey, []byte("p/")...) + oldPartitionKey = append(oldPartitionKey, marshaled...) + oldPartitionKey = append(oldPartitionKey, []byte("/test_key")...) + require.NoError(t, txn.Set(oldPartitionKey, []byte("test_value"))) + + // Set a partition assignment key with old format value (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + require.NoError(t, txn.Set(assignmentKey, marshaled)) + + return nil + })) +} + +// setupNewFormatDBKeys creates DB keys in the new string format for testing +func setupNewFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.Update(func(txn keyvalue.ReadWriter) error { + // Create new format partition key (string) + newPartitionID := storage.GetRaftPartitionName(storageName, partitionID) + newPartitionKey := append([]byte("p/"), append([]byte(newPartitionID), []byte("/test_key")...)...) + require.NoError(t, txn.Set(newPartitionKey, []byte("test_value"))) + + // Set a partition assignment key with new format value (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + require.NoError(t, txn.Set(assignmentKey, []byte(newPartitionID))) + + return nil + })) +} + +// verifyNewFormatDBKeys verifies that DB keys have been migrated to the new string format +func verifyNewFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + // Verify new format partition key exists + newPartitionID := storage.GetRaftPartitionName(storageName, partitionID) + newPartitionKey := make([]byte, 0, 2+len(newPartitionID)+len("/test_key")) + newPartitionKey = append(newPartitionKey, []byte("p/")...) + newPartitionKey = append(newPartitionKey, []byte(newPartitionID)...) + newPartitionKey = append(newPartitionKey, []byte("/test_key")...) + + item, err := txn.Get(newPartitionKey) + require.NoError(t, err, "new format partition key should exist") + + value, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "test_value", string(value)) + + // Verify old format partition key does not exist + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + oldPartitionKey := make([]byte, 0, 2+8+len("/test_key")) + oldPartitionKey = append(oldPartitionKey, []byte("p/")...) + oldPartitionKey = append(oldPartitionKey, marshaled...) + oldPartitionKey = append(oldPartitionKey, []byte("/test_key")...) + + _, err = txn.Get(oldPartitionKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "old format partition key should not exist") + + // Verify partition assignment value is in new format (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + item, err = txn.Get(assignmentKey) + require.NoError(t, err) + + assignmentValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, newPartitionID, string(assignmentValue), "assignment value should be in new string format") + + return nil + })) +} + +// verifyOldFormatDBKeys verifies that DB keys have been reverted to the old uint64 format +func verifyOldFormatDBKeys(t *testing.T, db keyvalue.Store, storageName, partitionID string) { + t.Helper() + + require.NoError(t, db.View(func(txn keyvalue.ReadWriter) error { + // Verify old format partition key exists + val, err := strconv.ParseUint(partitionID, 10, 64) + require.NoError(t, err) + + marshaled := make([]byte, 8) + binary.BigEndian.PutUint64(marshaled, val) + oldPartitionKey := make([]byte, 0, 2+8+len("/test_key")) + oldPartitionKey = append(oldPartitionKey, []byte("p/")...) + oldPartitionKey = append(oldPartitionKey, marshaled...) + oldPartitionKey = append(oldPartitionKey, []byte("/test_key")...) + + item, err := txn.Get(oldPartitionKey) + require.NoError(t, err, "old format partition key should exist") + + value, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "test_value", string(value)) + + // Verify new format partition key does not exist + newPartitionID := storage.GetRaftPartitionName(storageName, partitionID) + newPartitionKey := make([]byte, 0, 2+len(newPartitionID)+len("/test_key")) + newPartitionKey = append(newPartitionKey, []byte("p/")...) + newPartitionKey = append(newPartitionKey, []byte(newPartitionID)...) + newPartitionKey = append(newPartitionKey, []byte("/test_key")...) + + _, err = txn.Get(newPartitionKey) + require.ErrorIs(t, err, badger.ErrKeyNotFound, "new format partition key should not exist") + + // Verify partition assignment value is in old format (use unique key per partition) + assignmentKey := []byte("partition_assignment/test_repo_" + partitionID) + item, err = txn.Get(assignmentKey) + require.NoError(t, err) + + assignmentValue, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, marshaled, assignmentValue, "assignment value should be in old uint64 format") + + return nil + })) +} -- GitLab