LibSQL: Keep track of free heap blocks when trimming storage

When overwriting existing heap storage that requires fewer blocks, make
sure to free all remaining blocks so they can be reused in the future.
This commit is contained in:
Jelle Raaijmakers 2023-05-24 14:53:43 +02:00 committed by Tim Flynn
parent c5ebc4bb40
commit a6abc1697f
3 changed files with 75 additions and 6 deletions

View file

@ -96,3 +96,34 @@ TEST_CASE(heap_overwrite_large_storage)
auto stored_longest_string = TRY_OR_FAIL(heap->read_storage(storage_block_id));
EXPECT_EQ(longest_string.bytes(), stored_longest_string.bytes());
}
TEST_CASE(heap_reuse_freed_blocks_after_storage_trim)
{
ScopeGuard guard([]() { MUST(Core::System::unlink(db_path)); });
auto heap = create_heap();
// First, write storage spanning 4 blocks
auto first_index = heap->request_new_block_index();
StringBuilder builder;
MUST(builder.try_append_repeated('x', SQL::Block::DATA_SIZE * 4));
auto long_string = builder.string_view();
TRY_OR_FAIL(heap->write_storage(first_index, long_string.bytes()));
MUST(heap->flush());
auto original_heap_size = MUST(heap->file_size_in_bytes());
// Then, overwrite the first storage and reduce it to 2 blocks
builder.clear();
MUST(builder.try_append_repeated('x', SQL::Block::DATA_SIZE * 2));
long_string = builder.string_view();
TRY_OR_FAIL(heap->write_storage(first_index, long_string.bytes()));
MUST(heap->flush());
auto heap_size_after_reduction = MUST(heap->file_size_in_bytes());
EXPECT(heap_size_after_reduction <= original_heap_size);
// Now add the second storage spanning 2 blocks - heap should not have grown compared to the original storage
auto second_index = heap->request_new_block_index();
TRY_OR_FAIL(heap->write_storage(second_index, long_string.bytes()));
MUST(heap->flush());
auto heap_size_after_second_storage = MUST(heap->file_size_in_bytes());
EXPECT(heap_size_after_second_storage <= original_heap_size);
}

View file

@ -80,7 +80,15 @@ ErrorOr<size_t> Heap::file_size_in_bytes() const
bool Heap::has_block(Block::Index index) const
{
return index <= m_highest_block_written || m_write_ahead_log.contains(index);
return (index <= m_highest_block_written || m_write_ahead_log.contains(index))
&& !m_free_block_indices.contains_slow(index);
}
Block::Index Heap::request_new_block_index()
{
if (!m_free_block_indices.is_empty())
return m_free_block_indices.take_last();
return m_next_block++;
}
ErrorOr<ByteBuffer> Heap::read_storage(Block::Index index)
@ -107,21 +115,23 @@ ErrorOr<void> Heap::write_storage(Block::Index index, ReadonlyBytes data)
// Split up the storage across multiple blocks if necessary, creating a chain
u32 remaining_size = static_cast<u32>(data.size());
u32 offset_in_data = 0;
Block::Index existing_next_block_index = 0;
while (remaining_size > 0) {
auto block_data_size = AK::min(remaining_size, Block::DATA_SIZE);
remaining_size -= block_data_size;
ByteBuffer block_data;
Block::Index next_block_index = 0;
if (has_block(index)) {
auto existing_block = TRY(read_block(index));
block_data = existing_block.data();
TRY(block_data.try_resize(block_data_size));
next_block_index = existing_block.next_block();
existing_next_block_index = existing_block.next_block();
} else {
block_data = TRY(ByteBuffer::create_uninitialized(block_data_size));
existing_next_block_index = 0;
}
Block::Index next_block_index = existing_next_block_index;
if (next_block_index == 0 && remaining_size > 0)
next_block_index = request_new_block_index();
else if (remaining_size == 0)
@ -133,6 +143,14 @@ ErrorOr<void> Heap::write_storage(Block::Index index, ReadonlyBytes data)
index = next_block_index;
offset_in_data += block_data_size;
}
// Free remaining blocks in existing chain, if any
while (existing_next_block_index > 0) {
auto existing_block = TRY(read_block(existing_next_block_index));
existing_next_block_index = existing_block.next_block();
TRY(free_block(existing_block));
}
return {};
}
@ -207,13 +225,28 @@ ErrorOr<void> Heap::write_block(Block const& block)
return write_raw_block_to_wal(block.index(), move(heap_data));
}
ErrorOr<void> Heap::free_block(Block const& block)
{
auto index = block.index();
dbgln_if(SQL_DEBUG, "{}({})", __FUNCTION__, index);
VERIFY(index > 0);
VERIFY(has_block(index));
// Zero out freed blocks to facilitate a free block scan upon opening the database later
auto zeroed_data = TRY(ByteBuffer::create_zeroed(Block::SIZE));
TRY(write_raw_block_to_wal(index, move(zeroed_data)));
return m_free_block_indices.try_append(index);
}
ErrorOr<void> Heap::flush()
{
VERIFY(m_file);
auto indices = m_write_ahead_log.keys();
quick_sort(indices);
for (auto index : indices) {
dbgln_if(SQL_DEBUG, "Flushing block {} to {}", index, name());
dbgln_if(SQL_DEBUG, "Flushing block {}", index);
auto& data = m_write_ahead_log.get(index).value();
TRY(write_raw_block(index, data));
}

View file

@ -11,6 +11,7 @@
#include <AK/Debug.h>
#include <AK/DeprecatedString.h>
#include <AK/HashMap.h>
#include <AK/Vector.h>
#include <LibCore/File.h>
#include <LibCore/Object.h>
@ -39,6 +40,7 @@ public:
, m_next_block(next_block)
, m_data(move(data))
{
VERIFY(index > 0);
}
Index index() const { return m_index; }
@ -73,8 +75,8 @@ public:
ErrorOr<void> open();
ErrorOr<size_t> file_size_in_bytes() const;
bool has_block(Block::Index) const;
[[nodiscard]] Block::Index request_new_block_index() { return m_next_block++; }
[[nodiscard]] bool has_block(Block::Index) const;
[[nodiscard]] Block::Index request_new_block_index();
Block::Index schemas_root() const { return m_schemas_root; }
@ -126,6 +128,8 @@ private:
ErrorOr<Block> read_block(Block::Index);
ErrorOr<void> write_block(Block const&);
ErrorOr<void> free_block(Block const&);
ErrorOr<void> read_zero_block();
ErrorOr<void> initialize_zero_block();
ErrorOr<void> update_zero_block();
@ -139,6 +143,7 @@ private:
u32 m_version { VERSION };
Array<u32, 16> m_user_values { 0 };
HashMap<Block::Index, ByteBuffer> m_write_ahead_log;
Vector<Block::Index> m_free_block_indices;
};
}