Approach is below . This assumes the use of MNIST dataset
img_rows, img_cols = 28, 28
from keras.datasets import mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# Set train samples apart that will serve as streaming data later on
x_stream = x_train[:20000]
y_stream = y_train[:20000]
x_train = x_train[20000:]
y_train = y_train[20000:]
stream_sample = [x_stream, y_stream]
The stream_sample is the sample for streaming
pickle.dump(stream_sample, open(os.getcwd() + kwargs['path_stream_sample'], "wb"))
The stream sample is now written to the pickle file
Now lets construct the model
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adadelta(),
metrics=['accuracy'])
Now fitting the model is as below
model.fit(x_train, y_train,
batch_size=kwargs['batch_size'],
epochs=kwargs['epochs'],
verbose=1,
validation_data=(x_test, y_test))
# now evaluate the model
score = model.evaluate(x_test, y_test, verbose=0)
logging.info('Test - loss:', score[0])
logging.info('Test - accuracy:', score[1])
model.save(os.getcwd() + kwargs['initial_model_path'])
Now having both stream set and the trained model, lets feed additional data
For it, using Kafka
Kafka is one of the go-to platforms when you have to deal with streaming data. Its framework basically consists of three players, being 1) brokers; 2) producers; and 3) consumers.
A broker is an instance of a Kafka server (also known as a Kafka node) that hosts named streams of records, which are called topics. A broker takes in messages from producers and stores them to a topic. It in turn enables consumers to fetch messages from a topic.
In its simplest form, you have one single producer pushing messages to one end of a topic, whilst one single consumer fetches messages from the other end of the topic (like for example an app). In the situation of our case where we have Kafka running locally, a single setup likes this (shown below) does the trick.
With the help of the Kafka-Python API we can now simulate a data stream by constructing a Producer that publishes messages to the topic.
def generate_stream(**kwargs):
producer = KafkaProducer(bootstrap_servers=['kafka:9092'], # set up Producer
value_serializer=lambda x: dumps(x).encode('utf-8'))
stream_sample = pickle.load(open(os.getcwd() + kwargs['path_stream_sample'], "rb")) # load stream sample file
rand = random.sample(range(0, 20000), 200) # the stream sample consists of 20000 observations - and along this setup 200 samples are selected randomly
x_new = stream_sample[0]
y_new = stream_sample[1]
logging.info('Partitions: ', producer.partitions_for('TopicA'))
for i in rand:
json_comb = encode_to_json(x_new[i], y_new[i]) # pick observation and encode to JSON
producer.send('TopicA', value=json_comb) # send encoded observation to Kafka topic
logging.info("Sent number: {}".format(y_new[i]))
sleep(1)
producer.close()
Now the task will be to receive the stream data
To fetch the data from the Kafka topic, we turn again to the Kafka-Python API to construct a Consumer. This Consumer is wrapped in a function that sequentially retrieves observations from the topic, which it in turn converts back from JSON to its original format and groups together in a NumPy array which is stored (in pickle format) in the to_use_for_training folder.
def get_data_from_kafka(**kwargs):
consumer = KafkaConsumer(
kwargs['topic'], # specify topic to consume from
bootstrap_servers=[kwargs['client']],
consumer_timeout_ms=3000, # break connection if the consumer has fetched anything for 3 secs (e.g. in case of an empty topic)
auto_offset_reset='earliest', # automatically reset the offset to the earliest offset (should the current offset be deleted or anything)
enable_auto_commit=True, # offsets are committed automatically by the consumer
#group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
logging.info('Consumer constructed')
try:
xs = []
ys = []
for message in consumer: # loop over messages
logging.info( "Offset: ", message.offset)
message = message.value
x, y = decode_json(message) # decode JSON
xs.append(x)
ys.append(y)
logging.info('Image retrieved from topic')
xs = np.array(xs).reshape(-1, 28, 28, 1) # put Xs in the right shape for our CNN
ys = np.array(ys).reshape(-1) # put ys in the right shape for our CNN
new_samples = [xs, ys]
pickle.dump(new_samples, open(os.getcwd()+kwargs['path_new_data']+str(time.strftime("%Y%m%d_%H%M"))+"_new_samples.p", "wb")) # write data
logging.info(str(xs.shape[0])+' new samples retrieved')
consumer.close()
except Exception as e:
print(e)
logging.info('Error: '+e)
The update_model function in update_functions.py does most of the heavy lifting:
it takes in the data we fetched from the Kafka topic
it loads the current model and gauges how it scores on the test set*
it does a number of epochs of gradient descent with the new data and accordingly adjusts the weights of the model**
it then tests whether the adjusted model scores better on the test set than the current version — and if it does, it replaces the current version and moves the latter to a model archive. If it doesn’t it sticks to the current version of the model
in addition, it moves the data it used for updating the model to the used_for_training folder and logs a set of metrics corresponding to each update run to MLFlow
references:
https://www.vantage-ai.com/en/blog/keeping-your-ml-model-in-shape-with-kafka-airflow-and-mlflow