我正在尝试通过std::atomic<std::shared_ptr>>
对容器等非平凡物件进行操作来实作无锁包装器。我在这两个主题中找到了一些相关的信息:
- 记忆栅栏
- 原子使用
但这仍然不是我需要的。
举个例子:
TEST_METHOD(FechAdd)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
container ;
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
container ;
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
此函式作业正常,因为后增量运算子使用内部fetch_add()
原子操作。
另一方面:
TEST_METHOD(LoadStore)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
{
auto value = container.load();
value ;
container.store(value);
}
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i )
{
auto value = container.load();
value ;
container.store(value);
}
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
而如果我用.load()
和.store()
操作和这两个操作之间的增量替换它,结果就不一样了。这是两个原子操作,因此无法在这些操作之间进行同步。
我的最终目标是通过std::atomic<std::shared_ptr>
加载物件的实际状态,执行一些非常量操作,并再次通过存盘操作保存它。
TEST_METHOD(AtomicSharedPtr)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->emplace(5);
container.store(reader);
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->erase(5);
container.store(reader);
}
});
}
I knew that the second thread also has only shared_ptr from atomic and non-const operations on shared_ptr, which can only cause data race.
So any hint on how to implement a lock-free wrapper that will work with non-const operations of the object stored in std::atomic<std::shared_ptr>
?
uj5u.com热心网友回复:
首先,旁注。 std::atomic<std::shared_ptr<T>>
提供对指标的原子访问,并且不为提供任何同步T
。在这里要注意这一点非常重要。并且您的代码显示您正在尝试同步T
,而不是指标,因此 没有按照atomic
您的想法进行操作。为了使用std::atomic<std::shared_ptr<T>>
,您必须将指向的物件T
视为const
。
有两种方法可以以执行绪安全的方式处理任意资料的读取-修改-写入。第一个显然是使用锁。这通常执行起来更快,并且由于它的简单性,通常错误更少,因此强烈建议。如果你真的想用原子操作来做到这一点,这很困难,而且执行速度较慢。
它通常看起来像这样,您对指向的资料进行深度复制,对副本进行变异,然后尝试用新资料替换旧资料。如果其他人在此期间更改了资料,则您将其全部丢弃并重新开始整个突变。
template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
do {
const auto&& oldT = container.load();
//first a deep copy, to enforce immutability
auto&& newT = std::make_shared(oldT.get());
//then mutate the T
if (!function(*newT))
return false; //function aborted
//then attempt to save the modified T.
//if someone else changed the container during our modification, start over
} while(container.compare_exchange_strong(oldT, newT) == false);
//Note that this may take MANY tries to eventually succeed.
return true;
}
然后用法类似于您所拥有的:
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
readModifyWrite(container, [](auto& reader) {
reader.emplace(5);
return true;
});
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i )
{
readModifyWrite(container, [](auto& reader) {
reader.erase(5);
return true;
});
}
});
}
请注意,由于一个执行绪是插入5
loopCount
次数,另一个是擦除5
loopCount
次数,但它们之间不同步,因此第一个执行绪可能会连续写入多次(对于集合而言是空操作),然后是第二个执行绪可能会连续擦除多次(这是一组无操作),因此您无法真正保证此处的最终结果,但我假设您知道这一点。
但是,如果您想使用突变进行同步,那会变得相当复杂。如果成功或中止,变异函式必须回传,然后呼叫者readModifyWrite
必须处理修改中止的情况。(注意,readModifyWrite
有效地从函式回传值,所以它从修改步骤回传值。写步骤不影响回传值)
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_emplace = readModifyWrite(container, [](auto& reader) {
return reader.emplace(5);
});
if (did_emplace) i ;
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_erase = readModifyWrite(container, [](auto& reader) {
return reader.erase(5);
});
if (did_erase) i ;
}
});
}
0 评论