I have an item processing application, and I want to process a number of items in parallel. But, it appears sometimes it doesn't process one item at all, and sometimes it processes more than once. My code below
On a timer tick, a new thread is created which prepares an item list from database, then locks every item so that it won't be picked in next loop.
SqlConnection connection = GetConnection();
string selectCommand = my_select_command_where_IsLocked_is_null;
List<Item> itemList = new List<Item>();
using(SqlDataAdapter adapter = new SqlDataAdapter(selectCommand, connection))
{
using(SqlCommandBuilder cbCommandBuilder = new SqlCommandBuilder(adapter))
{
DataTable dtItemStore = new DataTable();
adapter.Fill(dtItemStore);
int totalRows = dtItemStore.Rows.Count;
if(totalRows > 0)
{
adapter.UpdateCommand = cbCommandBuilder.GetUpdateCommand();
foreach(DataRow row in dtItemStore.Rows)
{
Item pickedItem = new Item();
pickedItem.Key = row["Key"].ToString();
// Set all Item properties
itemList.Add(pickedItem);
row["IsLocked"] = 1;
}
}
adapter.Update(dtItemStore);
}
}
Then it iterates through the list, and fires a new thread for each element to process the item.
var keyList = "";
foreach(Item pickedItem in itemList)
{
Thread.Sleep(15); // Just to delay a bit
var newThread = new Thread(delegate() { ProcessItem(pickedItem); });
newThread.Start();
Logger.Log(" Spawn thread: " + pickedItem.Key + ", ThreadState: " + newThread.ThreadState);
keyList = keyList + pickedItem.Key;
}
Logger.Log("Keys picked: " + keyList);
Item processor function unlocks the item at the end of the loop.
function ProcessItem(Item pickedItem)
{
Logger.Log("Let's process !!! " + pickedItem.Key);
// Item processing code
UnlockItem(pickedItem); // unlock and write log
}
The problem is, after a while I realized there is too many locked items in my database, and the number is increasing. Logging at specific points I got something like below
- Spawn thread: 20779205, ThreadState: Running
- Let's process !!! 20779205
- Spawn thread: 20779206, ThreadState: Running
- Let's process !!! 20779206
- Spawn thread: 20779207, ThreadState: Running
- Let's process !!! 20779207
- Spawn thread: 20779208, ThreadState: Running <---Not processed
- Let's process !!! 20779209 <---Duplicate
- Spawn thread: 20779209, ThreadState: Running
- Let's process !!! 20779209
- Spawn thread: 20779211, ThreadState: Running <---Not processed
- Let's process !!! 20779213
- Spawn thread: 20779213, ThreadState: Running
- Let's process !!! 20779228
- Spawn thread: 20779228, ThreadState: Running
- Let's process !!! 20779231
- Let's process !!! 20779231 <---Duplicate
- Spawn thread: 20779231, ThreadState: Running
- Spawn thread: 20779237, ThreadState: Running
- Keys picked: 20779205, 20779206, 20779207, 20779208, 20779209, 20779211, 20779213, 20779228, 20779231, 20779237,
- Let's process !!! 20779237
- STOP processing. Possible duplicate for 20779209
- STOP processing. Possible duplicate for 20779231
- Unlock Key: 20779209, Row count:1
- Unlock Key: 20779231, Row count:1
- Unlock Key: 20779209, Row count:1
- Unlock Key: 20779237, Row count:1
- Unlock Key: 20779228, Row count:1
- Unlock Key: 20779206, Row count:1
- Unlock Key: 20779207, Row count:1
- Unlock Key: 20779205, Row count:1
- Unlock Key: 20779231, Row count:1
Here I could see that sometimes the ProcessItem
is not being hit (20779208
, 20779211
). I couldn't understand why this could be... after spending two days on this, I am totally lost.
Is this something typical in multi-threaded environment? How can I better handle this? I have to make sure that every Item is processed once and only once.
Also, I realized sometimes same Item is being processed more than once (20779209
, 20779231
), however, I handled it in the ProcessItem
function to avoid duplication. Is there a link between those two scenarios ?
What am I doing wrong?
from output line Keys picked: 20779205, 20779206, 20779207, 20779208, 20779209, 20779211, 20779213, 20779228, 20779231, 20779237
i would conclude that all items were hit (one time each)
may be you should modify your code to:
var keyList = "";
foreach(Item pickedItem in itemList)
{
Thread.Sleep(15); // Just to delay a bit
Item tmpItem = pickedItem; -- the same variable should not be accessed by many threads
var newThread = new Thread(delegate() { ProcessItem(tmpItem); });
newThread.Start();
Logger.Log(" Spawn thread: " + pickedItem.Key + ", ThreadState: " + newThread.ThreadState);
keyList = keyList + pickedItem.Key;
}
Logger.Log("Keys picked: " + keyList);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With