Closed JAHTKELD closed 4 years ago
Hello: I have the Provisioning Kit for ATECC508A, [http://ww1.microchip.com/downloads/en/DeviceDoc/Atmel-8966-CryptoAuth-Security-Provisioning-Kits-Quick-Start-Guide.pdf] This kit has a RootCA Usb and SignerCA USB to connect to PC. It has the same cert template for all device that you want to signer, but in this moment I dont know how to validate the Azure certificate, for that I have been used you certificate chain, but it doesn´t use the same cert_chain.c file. is it possible changed the python utility to use the same cert_chain.c file with diferent ATECC608A chips? Best Regards
http://ww1.microchip.com/downloads/en/Appnotes/Atmel-8974-CryptoAuth-ATECC-Compressed-Certificate-Definition-ApplicationNote.pdf according to Microchip documents, each cert definition of signer CA and device includes constant contents and variable.
my cert_chain python generator script https://github.com/kmwebnet/ECC608-Provision/blob/master/scripts/cert2certdef.py
It looks like a separate certificate is set, it only generates just "a template". You know you can leave "ST,OU".. as constant. Actual variables that they should be unique to each chip, such as public key, signature, issue and expire date will be set by provision.h that is output code of export_header.py
so you might treat cert_chain.c as constant once you determined your certificate structure and generate it as unique for same product line.
I tested provision code that creates public key pem and exchanges device cert as pem from provisioning PC via serial port, and success.
example code for ESP32 side.(this doesn't work as is)
const char certificatefooter[] = "----END CERTIFICATE----"; const char pubkeyfooter[] = "----END PUBLIC KEY----";
uint8_t stateflag = 0; uint16_t certificateoffset = 0;
static void uart_event_task(void pvParameters)
{
uart_event_t event;
size_t buffered_size;
uint8_t dtmp = (uint8_t) malloc(RD_BUF_SIZE);
for(;;) {
//Waiting for UART event.
if(xQueueReceive(uart0_queue, (void )&event, (portTickType)portMAX_DELAY)) {
bzero(dtmp, RD_BUF_SIZE);
switch(event.type) {
//Event of UART receving data
/We'd better handler data event fast, there would be much more data events than
other types of events. If we take too much time on data event, the queue might
be full./
case UART_DATA:
uart_read_bytes(EX_UART_NUM, dtmp, event.size, portMAX_DELAY);
if(stateflag == 1){
memcpy (&signercert[certificateoffset], dtmp, event.size);
certificateoffset += event.size;
// detect the end of certificate
if ( strstr ((char *)dtmp , certificatefooter) != NULL )
{
stateflag = 0;
certificateoffset = 0;
uart_flush(EX_UART_NUM);
break;
}
uart_flush(EX_UART_NUM);
break;
}
if(stateflag == 2){
memcpy (&devicecert[certificateoffset], dtmp, event.size);
certificateoffset += event.size;
// detect the end of certificate
if ( strstr ((char *)dtmp , certificatefooter) != NULL )
{
stateflag = 0;
certificateoffset = 0;
uart_flush(EX_UART_NUM);
break;
}
uart_flush(EX_UART_NUM);
break;
}
if(stateflag == 3){
memcpy (&rootpubkey[certificateoffset], dtmp, event.size);
certificateoffset += event.size;
// detect the end of certificate
if ( strstr ((char *)dtmp , pubkeyfooter) != NULL )
{
stateflag = 0;
certificateoffset = 0;
uart_flush(EX_UART_NUM);
break;
}
uart_flush(EX_UART_NUM);
break;
}
if('r' == dtmp[0]){
printf("Ready.\n");
uart_flush(EX_UART_NUM);
}
if('k' == dtmp[0]){
atca_configure(0xc0, &cfg);
uart_flush(EX_UART_NUM);
}
// signer certificate accept mode if('c' == dtmp[0]){ stateflag = 1; uart_flush(EX_UART_NUM); }
// device certificate accept mode if('v' == dtmp[0]){ stateflag = 2; uart_flush(EX_UART_NUM); }
// root pubkey accept mode if('b' == dtmp[0]){ stateflag = 3; uart_flush(EX_UART_NUM); } if('s' == dtmp[0]){ uart_write_bytes(EX_UART_NUM, (const char) serialstr, 18); uart_flush(EX_UART_NUM); } if('p' == dtmp[0]){ char printresult[100] = {}; for (int i=0 ; i < strlen((const char )signercert) / 65 + 1 ; i++) { memcpy (printresult, &signercert[i 65], 65); uart_write_bytes(EX_UART_NUM, (const char) printresult, 65); }
uart_flush(EX_UART_NUM);
memset(&printresult, 0 , 100 );
for (int i=0 ; i < strlen((const char *)devicecert) / 65 + 1 ; i++)
{
memcpy (printresult, &devicecert[i * 65], 65);
uart_write_bytes(EX_UART_NUM, (const char*) printresult, 65);
}
uart_flush(EX_UART_NUM);
break;
}
if('o' == dtmp[0]){
char printresult[100] = {};
for (int i=0 ; i < strlen((const char *)rootpubkey) / 65 + 1 ; i++)
{
memcpy (printresult, &rootpubkey[i * 65], 65);
uart_write_bytes(EX_UART_NUM, (const char*) printresult, 65);
}
uart_flush(EX_UART_NUM);
break;
}
if('q' == dtmp[0]){
stateflag = 10;
free(dtmp);
dtmp = NULL;
uart_flush(EX_UART_NUM);
vTaskDelete(NULL);
}
break;
//Event of HW FIFO overflow detected
case UART_FIFO_OVF:
ESP_LOGI(TAG, "hw fifo overflow");
// If fifo overflow happened, you should consider adding flow control for your application.
// The ISR has already reset the rx FIFO,
// As an example, we directly flush the rx buffer here in order to read more data.
uart_flush_input(EX_UART_NUM);
xQueueReset(uart0_queue);
break;
//Event of UART ring buffer full
case UART_BUFFER_FULL:
ESP_LOGI(TAG, "ring buffer full");
// If buffer full happened, you should consider encreasing your buffer size
// As an example, we directly flush the rx buffer here in order to read more data.
uart_flush_input(EX_UART_NUM);
xQueueReset(uart0_queue);
break;
//Event of UART RX break detected
case UART_BREAK:
ESP_LOGI(TAG, "uart rx break");
break;
//Event of UART parity check error
case UART_PARITY_ERR:
ESP_LOGI(TAG, "uart parity error");
break;
//Event of UART frame error
case UART_FRAME_ERR:
ESP_LOGI(TAG, "uart frame error");
break;
//UART_PATTERN_DET
case UART_PATTERN_DET:
uart_get_buffered_data_len(EX_UART_NUM, &buffered_size);
int pos = uart_pattern_pop_pos(EX_UART_NUM);
ESP_LOGI(TAG, "[UART PATTERN DETECTED] pos: %d, buffered size: %d", pos, buffered_size);
if (pos == -1) {
// There used to be a UART_PATTERN_DET event, but the pattern position queue is full so that it can not
// record the position. We should set a larger queue size.
// As an example, we directly flush the rx buffer here.
uart_flush_input(EX_UART_NUM);
} else {
uart_read_bytes(EX_UART_NUM, dtmp, pos, 100 / portTICK_PERIOD_MS);
uint8_t pat[PATTERN_CHR_NUM + 1];
memset(pat, 0, sizeof(pat));
uart_read_bytes(EX_UART_NUM, pat, PATTERN_CHR_NUM, 100 / portTICK_PERIOD_MS);
ESP_LOGI(TAG, "read data: %s", dtmp);
ESP_LOGI(TAG, "read pat : %s", pat);
}
break;
//Others
default:
ESP_LOGI(TAG, "uart event type: %d", event.type);
break;
}
}
}
free(dtmp);
dtmp = NULL;
vTaskDelete(NULL);
} uart_config_t uart_config = { .baud_rate = 115200, .data_bits = UART_DATA_8_BITS, .parity = UART_PARITY_DISABLE, .stop_bits = UART_STOP_BITS_1, .flow_ctrl = UART_HW_FLOWCTRL_DISABLE, //.rx_flow_ctrl_thresh = 122, .rx_flow_ctrl_thresh = 0, }; //Set UART parameters uart_param_config(EX_UART_NUM, &uart_config); //Set UART log level esp_log_level_set(TAG, ESP_LOG_INFO); //Install UART driver, and get the queue. uart_driver_install(EX_UART_NUM, BUF_SIZE 2, BUF_SIZE 2, 10, &uart0_queue, 0);
//Set UART pins (using UART0 default pins ie no changes.)
uart_set_pin(EX_UART_NUM, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE);
//Set uart pattern detect function.
uart_enable_pattern_det_intr(EX_UART_NUM, '+', 3, 10000, 10, 10);
//Reset the pattern queue length to record at most 20 pattern positions.
uart_pattern_queue_reset(EX_UART_NUM, 20);
//Create a task to handler UART event from ISR
xTaskCreate(uart_event_task, "uart_event_task", 2048, NULL, 12, NULL);
example PC side code(this may works as is)
import serial import serial.tools.list_ports import time import binascii import os import sys from createdevice import create_device
def uart_write_read(w_data, r_size):
ser.write(w_data)
print('Send: '+ str(w_data))
ser.flush()
time.sleep(0.5)
# Read
r_data = ser.read_all().decode('utf-8', "backslashreplace")
print('Recv: ' + str(r_data))
return r_data
def uart_read(r_size):
r_data = ser.read_all().decode('utf-8', "backslashreplace")
print('Recv: ' + str(r_data))
return r_data
def search_com_port(): coms = serial.tools.list_ports.comports() comlist = [] for com in coms: comlist.append(com.device) print('Connected COM ports: ' + str(comlist))
use_port = comlist[0]
print('Use COM port: ' + use_port)
return use_port
if name == 'main':
use_port = search_com_port()
ser = serial.Serial(port = use_port ,
bytesize = serial.EIGHTBITS ,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE ,
baudrate = 115200 ,
timeout = 1
)
r_size = 5000
r_data = uart_read( r_size)
for i in range (5):
time.sleep(1)
w_data = b'r'
r_data = uart_write_read(w_data, r_size)
if 'Ready.' in r_data :
print('communication ready.')
break
for i in range (5):
time.sleep(1)
w_data = b's'
r_data = uart_write_read(w_data, r_size)
if len(r_data) > 0 :
with open('{}-serial.txt'.format(r_data), 'w' ) as f:
f.write(r_data)
commonname = "%s-serial.txt" % r_data
print('got serial number.')
break
filename = "%s-device-pubkey.pem" % r_data
target_filename = "%s-device.crt" % r_data
if not os.path.exists('{}'.format(filename)):
for i in range (5):
time.sleep(1)
w_data = b'k'
r_data = uart_write_read(w_data, r_size)
time.sleep(5)
if len(r_data) > 0 :
with open('{}'.format(filename), 'w', encoding="utf-8" ,newline="\n") as f:
f.write(r_data.replace('\r', ''))
print('got device public key.')
break
create_device('{}'.format(target_filename), '{}'.format(filename), 'signer-ca.crt', 'signer-ca.key', 'root-ca.crt', 'root-pub.pem', 'ou.txt', '{}'.format(commonname))
time.sleep(1)
w_data = b'c'
r_data = uart_write_read(w_data, r_size)
f = open('signer-ca.crt')
columns = f.read().replace('\r', '').split("\n")
f.close()
for column in columns:
w_data = column.encode() + b'\n'
r_data = uart_write_read(w_data, r_size)
columns = ""
ser.flush()
time.sleep(1)
w_data = b'v'
r_data = uart_write_read(w_data, r_size)
f = open('{}'.format(target_filename))
columns = f.read().replace('\r', '').split("\n")
f.close()
for column in columns:
w_data = column.encode() + b'\n'
r_data = uart_write_read(w_data, r_size)
columns = ""
ser.flush()
time.sleep(1)
w_data = b'b'
r_data = uart_write_read(w_data, r_size)
f = open('root-pub.pem')
columns = f.read().replace('\r', '').split("\n")
f.close()
for column in columns:
w_data = column.encode() + b'\n'
r_data = uart_write_read(w_data, r_size)
w_data = b'q'
r_data = uart_write_read(w_data, r_size)
time.sleep(3)
r_data = uart_read(r_size)
ser.flush()
time.sleep(5)
r_data = uart_read(r_size)
ser.flush()
time.sleep(5)
r_data = uart_read(r_size)
ser.flush()
time.sleep(5)
r_data = uart_read(r_size)
time.sleep(5)
ser.flush()
w_data = b' '
r_data = uart_write_read(w_data, r_size)
time.sleep(5)
ser.flush()
r_data = uart_read(r_size)
time.sleep(5)
ser.flush()
r_data = uart_read(r_size)
ser.close()
the upload codes have collapsed.
I'll write another repo later.
I Upload ECC608-MassProvosioning. could you try it?
With this example you need to compile the firmare for each IoT device, because the cert_chain.c file change for each device and that is not practical. I'm looking for a solution but the function
needs a constant structure atcacert_def_s. If you want to update the IoT devices by OTA it is necessary to have the same firmware for all IoT devices