Ok I'm trying to define a dataclass to enqueue a job in redis for a sidekiq worker, the specification of the sidekiq payload requires some attributes something with this format:
{
"class": "SomeWorker",
"queue": "default"
"jid": "b4a577edbccf1d805744efa9", // 12-byte random number as 24 char hex string
"args": [......],
"created_at": 1234567890,
"enqueued_at": 1234567890
}
So I define a dataclass in my python code:
@dataclass
class PusherNotificationJob:
args: Any = None
queue: str = "default"
jid: str = secrets.token_hex(12)
retry: bool = True
created_at: datetime.datetime = time.time()
enqueued_at: datetime.datetime = time.time()
def asdict(self):
return {** self.__dict__, "class": "SomeWorker"}
My problem is that I can't define "class" as an attribute of PusherNotificationJob because it's a reserved word. So I need to define the asdict method to serialize as a dict and add the "class" attribute I added here.
There is a better way to do this?
A few things to pay attention before the answer: created_at and enqueued_at will have the same value for as long as your code is running... if you want the time to be when the instance was created, use:
created_at: datetime.datetime = field(default_factory=time.time)
moving on:
You can override __iter__
to use dict(instance)
. It would be something like below if you have an attribute named class_: str = 'SomeWorker'
def __iter__(self):
for key in self.__dict__:
if key == 'class_':
key = 'class'
yield key, self.__dict__
If you need __iter__
to behave normally, you use the following:
def convert_keys(self):
for key in self.__dict__:
value = self.__dict__[key]
if key == 'class_':
key = 'class'
yield key, value
def asdict(self):
return {key: value for key, value in self.convert_keys()}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With